Compare commits

...

15 Commits

Author SHA1 Message Date
Jędrzej Stuczyński b2ecc81da6 added ibc messages to MessageRegistry 2025-07-08 10:41:54 +01:00
Jędrzej Stuczyński 3547e1caf1 using SignerInfo proto definitions for db serialisation 2025-07-08 10:05:49 +01:00
Jędrzej Stuczyński 70590dd4f0 Revert "use postgres in chain scraper"
This reverts commit 83c84bfd2d.
2025-07-07 17:21:22 +01:00
Jędrzej Stuczyński aafa7ba14c adding null value for logs 2025-07-07 15:04:33 +01:00
Jędrzej Stuczyński 05f2079b64 involved addresses 2025-07-07 15:00:27 +01:00
Jędrzej Stuczyński 42c0b69f4d message content parsing in psql 2025-07-07 14:54:16 +01:00
Jędrzej Stuczyński 2320a2f249 added message registry to block processor 2025-07-07 14:36:53 +01:00
Jędrzej Stuczyński 83c84bfd2d use postgres in chain scraper 2025-07-04 18:15:24 +01:00
Jędrzej Stuczyński 71090c85c2 initial postgres support - missing some proto -> json parsing 2025-07-04 18:04:37 +01:00
Jędrzej Stuczyński 976961471b psql scaffolding 2025-07-04 12:45:06 +01:00
Jędrzej Stuczyński 655fd421a6 using sqlite instance for rewarder and chain watcher 2025-07-04 12:25:08 +01:00
Jędrzej Stuczyński 81eaf7b1cc implemented traits for sqlite instance 2025-07-04 12:09:45 +01:00
Jędrzej Stuczyński 027bd85200 changed error types to make modules dyn compatible 2025-07-04 11:23:25 +01:00
Jędrzej Stuczyński 08cff312af wip: made storage mostly generic minus modules 2025-07-04 11:08:05 +01:00
Jędrzej Stuczyński 2920e6ff01 rename nyxd-scraper to sqlite 2025-07-03 16:33:32 +01:00
110 changed files with 3807 additions and 569 deletions
Generated
+175 -20
View File
@@ -1355,7 +1355,7 @@ checksum = "c2895653b4d9f1538a83970077cb01dfc77a4810524e51a110944688e916b18e"
dependencies = [
"prost 0.11.9",
"prost-types",
"tonic",
"tonic 0.9.2",
"tracing-core",
]
@@ -1377,7 +1377,7 @@ dependencies = [
"thread_local",
"tokio",
"tokio-stream",
"tonic",
"tonic 0.9.2",
"tracing",
"tracing-core",
"tracing-subscriber",
@@ -1467,6 +1467,19 @@ dependencies = [
"tendermint-proto",
]
[[package]]
name = "cosmos-sdk-proto"
version = "0.27.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95ac39be7373404accccaede7cc1ec942ccef14f0ca18d209967a756bf1dbb1f"
dependencies = [
"informalsystems-pbjson",
"prost 0.13.5",
"serde",
"tendermint-proto",
"tonic 0.13.1",
]
[[package]]
name = "cosmrs"
version = "0.21.1"
@@ -1474,7 +1487,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1394c263335da09e8ba8c4b2c675d804e3e0deb44cce0866a5f838d3ddd43d02"
dependencies = [
"bip32",
"cosmos-sdk-proto",
"cosmos-sdk-proto 0.26.1",
"ecdsa",
"eyre",
"k256",
"rand_core 0.6.4",
"serde",
"serde_json",
"signature",
"subtle-encoding",
"tendermint",
"tendermint-rpc",
"thiserror 1.0.69",
]
[[package]]
name = "cosmrs"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34e74fa7a22930fe0579bef560f2d64b78415d4c47b9dd976c0635136809471d"
dependencies = [
"bip32",
"cosmos-sdk-proto 0.27.0",
"ecdsa",
"eyre",
"k256",
@@ -3507,6 +3541,7 @@ dependencies = [
"bytes",
"futures-channel",
"futures-util",
"h2 0.4.9",
"http 1.3.1",
"http-body 1.0.1",
"httparse",
@@ -3562,6 +3597,19 @@ dependencies = [
"tokio-io-timeout",
]
[[package]]
name = "hyper-timeout"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0"
dependencies = [
"hyper 1.6.0",
"hyper-util",
"pin-project-lite",
"tokio",
"tower-service",
]
[[package]]
name = "hyper-util"
version = "0.1.11"
@@ -3605,6 +3653,39 @@ dependencies = [
"cc",
]
[[package]]
name = "ibc-proto"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a650b51e384e54264b53974feb38e95e37aac70f7f2f9c07eb8022fe15eb8e20"
dependencies = [
"base64 0.22.1",
"bytes",
"cosmos-sdk-proto 0.27.0",
"flex-error",
"ics23",
"informalsystems-pbjson",
"prost 0.13.5",
"serde",
"subtle-encoding",
"tendermint-proto",
"tonic 0.13.1",
]
[[package]]
name = "ics23"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73b17f1a5bd7d12ad30a21445cfa5f52fd7651cb3243ba866f9916b1ec112f12"
dependencies = [
"anyhow",
"bytes",
"hex",
"informalsystems-pbjson",
"prost 0.13.5",
"serde",
]
[[package]]
name = "icu_collections"
version = "1.5.0"
@@ -3866,6 +3947,16 @@ dependencies = [
"web-time",
]
[[package]]
name = "informalsystems-pbjson"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9aa4a0980c8379295100d70854354e78df2ee1c6ca0f96ffe89afeb3140e3a3d"
dependencies = [
"base64 0.21.7",
"serde",
]
[[package]]
name = "inotify"
version = "0.9.6"
@@ -3929,9 +4020,9 @@ checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02"
[[package]]
name = "inventory"
version = "0.3.19"
version = "0.3.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54b12ebb6799019b044deaf431eadfe23245b259bba5a2c0796acec3943a3cdb"
checksum = "ab08d7cd2c5897f2c949e5383ea7c7db03fb19130ffcfbf7eda795137ae3cb83"
dependencies = [
"rustversion",
]
@@ -4906,7 +4997,7 @@ name = "nym-api-requests"
version = "0.1.0"
dependencies = [
"bs58",
"cosmrs",
"cosmrs 0.22.0",
"cosmwasm-std",
"ecdsa",
"getset",
@@ -4944,7 +5035,7 @@ version = "0.1.0"
source = "git+https://github.com/nymtech/nym.git?branch=release/2025.11-cheddar#e9bb9792ab723a1ad5fe40cb292dc08d4eb40c2f"
dependencies = [
"bs58",
"cosmrs",
"cosmrs 0.21.1",
"cosmwasm-std",
"ecdsa",
"getset",
@@ -5138,7 +5229,7 @@ dependencies = [
"clap",
"colored",
"comfy-table",
"cosmrs",
"cosmrs 0.22.0",
"cosmwasm-std",
"csv",
"cw-utils",
@@ -5302,7 +5393,7 @@ name = "nym-client-core-gateways-storage"
version = "0.1.0"
dependencies = [
"async-trait",
"cosmrs",
"cosmrs 0.22.0",
"log",
"nym-crypto 0.4.0",
"nym-gateway-requests",
@@ -5662,7 +5753,7 @@ version = "0.1.0"
dependencies = [
"bincode",
"bls12_381",
"cosmrs",
"cosmrs 0.22.0",
"log",
"nym-api-requests 0.1.0",
"nym-credentials-interface 0.1.0",
@@ -7704,7 +7795,7 @@ name = "nym-types"
version = "1.0.0"
dependencies = [
"base64 0.22.1",
"cosmrs",
"cosmrs 0.22.0",
"cosmwasm-std",
"eyre",
"hmac",
@@ -7738,7 +7829,7 @@ dependencies = [
"bip32",
"bip39",
"colored",
"cosmrs",
"cosmrs 0.22.0",
"cosmwasm-std",
"cw-controllers",
"cw-utils",
@@ -7789,7 +7880,7 @@ dependencies = [
"bip32",
"bip39",
"colored",
"cosmrs",
"cosmrs 0.21.1",
"cosmwasm-std",
"cw-controllers",
"cw-utils",
@@ -7854,7 +7945,7 @@ dependencies = [
"nym-task 0.1.0",
"nym-ticketbooks-merkle 0.1.0",
"nym-validator-client 0.1.0",
"nyxd-scraper",
"nyxd-scraper-sqlite",
"rand 0.8.5",
"rand_chacha 0.3.1",
"serde",
@@ -7945,7 +8036,7 @@ dependencies = [
name = "nym-wallet-types"
version = "1.0.0"
dependencies = [
"cosmrs",
"cosmrs 0.21.1",
"cosmwasm-std",
"hex-literal",
"nym-config 0.1.0",
@@ -8061,7 +8152,7 @@ dependencies = [
"nym-network-defaults 0.1.0",
"nym-task 0.1.0",
"nym-validator-client 0.1.0",
"nyxd-scraper",
"nyxd-scraper-sqlite",
"reqwest 0.12.15",
"schemars",
"serde",
@@ -8079,18 +8170,39 @@ dependencies = [
]
[[package]]
name = "nyxd-scraper"
name = "nyxd-scraper-psql"
version = "0.1.0"
dependencies = [
"async-trait",
"base64 0.22.1",
"cosmrs 0.22.0",
"itertools 0.14.0",
"nyxd-scraper-shared",
"serde",
"serde_json",
"sqlx",
"thiserror 2.0.12",
"tokio",
"tracing",
]
[[package]]
name = "nyxd-scraper-shared"
version = "0.1.0"
dependencies = [
"async-trait",
"base64 0.22.1",
"const_format",
"cosmrs",
"cosmos-sdk-proto 0.27.0",
"cosmrs 0.22.0",
"eyre",
"futures",
"humantime",
"ibc-proto",
"prost 0.13.5",
"serde",
"serde_json",
"sha2 0.10.9",
"sqlx",
"tendermint",
"tendermint-rpc",
"thiserror 2.0.12",
@@ -8102,6 +8214,18 @@ dependencies = [
"url",
]
[[package]]
name = "nyxd-scraper-sqlite"
version = "0.1.0"
dependencies = [
"async-trait",
"nyxd-scraper-shared",
"sqlx",
"thiserror 2.0.12",
"tokio",
"tracing",
]
[[package]]
name = "object"
version = "0.36.7"
@@ -11216,7 +11340,7 @@ dependencies = [
"http 0.2.12",
"http-body 0.4.6",
"hyper 0.14.32",
"hyper-timeout",
"hyper-timeout 0.4.1",
"percent-encoding",
"pin-project",
"prost 0.11.9",
@@ -11228,6 +11352,34 @@ dependencies = [
"tracing",
]
[[package]]
name = "tonic"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9"
dependencies = [
"async-trait",
"base64 0.22.1",
"bytes",
"h2 0.4.9",
"http 1.3.1",
"http-body 1.0.1",
"http-body-util",
"hyper 1.6.0",
"hyper-timeout 0.5.2",
"hyper-util",
"percent-encoding",
"pin-project",
"prost 0.13.5",
"socket2",
"tokio",
"tokio-stream",
"tower 0.5.2",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower"
version = "0.4.13"
@@ -11256,9 +11408,12 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9"
dependencies = [
"futures-core",
"futures-util",
"indexmap 2.7.1",
"pin-project-lite",
"slab",
"sync_wrapper 1.0.2",
"tokio",
"tokio-util",
"tower-layer",
"tower-service",
"tracing",
+6 -2
View File
@@ -80,7 +80,9 @@ members = [
"common/nymsphinx/params",
"common/nymsphinx/routing",
"common/nymsphinx/types",
"common/nyxd-scraper",
"common/nyxd-scraper-sqlite",
"common/nyxd-scraper-psql",
"common/nyxd-scraper-shared",
"common/pemstore",
"common/serde-helpers",
"common/service-provider-requests-common",
@@ -383,7 +385,9 @@ cw-multi-test = "=2.3.2"
bip32 = { version = "0.5.3", default-features = false }
cosmrs = { version = "0.21.1" }
cosmrs = { version = "0.22.0" }
cosmos-sdk-proto = { version = "0.27.0" }
ibc-proto = { version = "0.52.0" }
tendermint = "0.40.4"
tendermint-rpc = "0.40.4"
prost = { version = "0.13", default-features = false }
@@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO validator (consensus_address, consensus_pubkey)\n VALUES ($1, $2)\n ON CONFLICT DO NOTHING\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Text"
]
},
"nullable": []
},
"hash": "0d3709efacf763b06bf14803bb803b5ee5b27879b0026bb0480b3f2722318a75"
}
@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "DELETE FROM pre_commit WHERE height < $1",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": []
},
"hash": "1c2fb0e9ffceca21ef8dbea19b116422b1f723d0a316314b50c43c8b29f8891d"
}
@@ -0,0 +1,26 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO transaction\n (hash, height, index, success, messages, memo, signatures, signer_infos, fee, gas_wanted, gas_used, raw_log, logs)\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)\n ON CONFLICT (hash) DO UPDATE\n SET height = excluded.height,\n index = excluded.index,\n success = excluded.success,\n messages = excluded.messages,\n memo = excluded.memo,\n signatures = excluded.signatures,\n signer_infos = excluded.signer_infos,\n fee = excluded.fee,\n gas_wanted = excluded.gas_wanted,\n gas_used = excluded.gas_used,\n raw_log = excluded.raw_log,\n logs = excluded.logs\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Int8",
"Int4",
"Bool",
"Json",
"Text",
"TextArray",
"Jsonb",
"Jsonb",
"Int8",
"Int8",
"Text",
"Jsonb"
]
},
"nullable": []
},
"hash": "1e344c1dff8b98eb0eb2e530e28f3cb2eed5b5d35391fd30a4d5f44f2e2178b7"
}
@@ -0,0 +1,20 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT height\n FROM block\n ORDER BY height ASC\n LIMIT 1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "height",
"type_info": "Int8"
}
],
"parameters": {
"Left": []
},
"nullable": [
false
]
},
"hash": "2561fb016951ea4cd29e43fb9a4a93e944b0d44ed1f7c1036f306e34372da11c"
}
@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE metadata SET last_processed_height = GREATEST(last_processed_height, $1)",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int4"
]
},
"nullable": []
},
"hash": "2679cdf11fa66c7920678cde860c57402119ec7c3aae731b0da831327301466f"
}
@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE pruning SET last_pruned_height = $1",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": []
},
"hash": "36ba5941aca6e7b604a10b8b0aba70635028f392fe794d6131827b083e1755e1"
}
@@ -0,0 +1,20 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT last_pruned_height FROM pruning\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "last_pruned_height",
"type_info": "Int8"
}
],
"parameters": {
"Left": []
},
"nullable": [
false
]
},
"hash": "3bdf81a9db6075f6f77224c30553f419a849d4ec45af40b052a4cbf09b44f3ec"
}
@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "DELETE FROM message WHERE height < $1",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": []
},
"hash": "52c27143720ddfdfd0f5644b60f5b67fd9281ce1de0653efa53b9d9b93cf335d"
}
@@ -0,0 +1,18 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO pre_commit (validator_address, height, timestamp, voting_power, proposer_priority)\n VALUES ($1, $2, $3, $4, $5)\n ON CONFLICT (validator_address, timestamp) DO NOTHING\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Int8",
"Timestamp",
"Int8",
"Int8"
]
},
"nullable": []
},
"hash": "62e14613f5ffe692346a79086857a22f0444fbc679db1c06b651fb8b5538b278"
}
@@ -0,0 +1,19 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO block (height, hash, num_txs, total_gas, proposer_address, timestamp)\n VALUES ($1, $2, $3, $4, $5, $6)\n ON CONFLICT DO NOTHING\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8",
"Text",
"Int4",
"Int8",
"Text",
"Timestamp"
]
},
"nullable": []
},
"hash": "64a484fd46d8ec46797f944a4cced56b6e270ce186f0e49528865d1924343b78"
}
@@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT height\n FROM block\n WHERE timestamp < $1\n ORDER BY timestamp DESC\n LIMIT 1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "height",
"type_info": "Int8"
}
],
"parameters": {
"Left": [
"Timestamp"
]
},
"nullable": [
false
]
},
"hash": "7e82426f5dbcadf1631ba1a806e19cc462d04222fb20ad76de2a40f3f4f8fe15"
}
@@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT height\n FROM block\n WHERE timestamp > $1\n ORDER BY timestamp\n LIMIT 1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "height",
"type_info": "Int8"
}
],
"parameters": {
"Left": [
"Timestamp"
]
},
"nullable": [
false
]
},
"hash": "9455331f9be5a3be28e2bd399a36b2e2d6a9ad4b225c4c883aafc4e9f0428008"
}
@@ -0,0 +1,24 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT COUNT(*) as count FROM pre_commit\n WHERE\n validator_address = $1\n AND height >= $2\n AND height <= $3\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "count",
"type_info": "Int8"
}
],
"parameters": {
"Left": [
"Text",
"Int8",
"Int8"
]
},
"nullable": [
null
]
},
"hash": "bc7795e58ce71893c3f32a19db8e77b7bc0a1af315ffd42c3e68156d6e4ace70"
}
@@ -0,0 +1,28 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT * FROM validator\n WHERE EXISTS (\n SELECT 1 FROM pre_commit\n WHERE height = $1\n AND pre_commit.validator_address = validator.consensus_address\n )\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "consensus_address",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "consensus_pubkey",
"type_info": "Text"
}
],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": [
false,
false
]
},
"hash": "be43d4873911deca784b7be0531ab7bd82ecd68041aa932a56c8ce09623251e4"
}
@@ -0,0 +1,20 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT last_processed_height FROM metadata\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "last_processed_height",
"type_info": "Int4"
}
],
"parameters": {
"Left": []
},
"nullable": [
false
]
},
"hash": "c88d07fecc3f33deaa6e93db1469ce71582635df47f52dcf3fd1df4e7be6b96d"
}
@@ -0,0 +1,19 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO message(transaction_hash, index, type, value, involved_accounts_addresses, height)\n VALUES ($1, $2, $3, $4, $5, $6)\n ON CONFLICT (transaction_hash, index) DO UPDATE\n SET height = excluded.height,\n type = excluded.type,\n value = excluded.value,\n involved_accounts_addresses = excluded.involved_accounts_addresses\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Int8",
"Text",
"Json",
"TextArray",
"Int8"
]
},
"nullable": []
},
"hash": "cc0ae74082d7d8a89f2d3364676890bbf6150ab394c72783114340d4def5f9ef"
}
@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "DELETE FROM block WHERE height < $1",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": []
},
"hash": "cdba9b267f143c8a8c6c3d6ed713cf00236490b86779559d84740ec18bcfa3a9"
}
@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "DELETE FROM transaction WHERE height < $1",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": []
},
"hash": "d89558c37c51e8e6b1e6a9d5a2b13d0598fd856aa019a0cbbae12d7cafb4672f"
}
+34
View File
@@ -0,0 +1,34 @@
[package]
name = "nyxd-scraper-psql"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
rust-version.workspace = true
readme.workspace = true
[dependencies]
async-trait = { workspace = true }
base64 = { workspace = true }
itertools = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres", "macros", "migrate", "time"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing.workspace = true
nyxd-scraper-shared = { path = "../nyxd-scraper-shared" }
# temp due to cosmrs redefinitions for serde
cosmrs = { workspace = true }
[build-dependencies]
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres", "macros", "migrate"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
[lints]
workspace = true
+102
View File
@@ -0,0 +1,102 @@
# Makefile for nyxd-scraper-psql database management
# --- Configuration ---
TEST_DATABASE_URL := postgres://testuser:testpass@localhost:5433/nyxd_scraper_test
# Docker compose service names
DB_SERVICE_NAME := postgres-test
DB_CONTAINER_NAME := nyxd_scraper_psql_test
# Default target
.PHONY: default
default: help
# --- Main Targets ---
.PHONY: prepare-pg
prepare-pg: test-db-up test-db-wait test-db-migrate test-db-prepare test-db-down ## Setup PostgreSQL and prepare SQLx offline cache
.PHONY: test-db
test-db: test-db-up test-db-wait test-db-migrate test-db-run test-db-down ## Run tests with PostgreSQL database
.PHONY: dev-db
dev-db: test-db-up test-db-wait test-db-migrate ## Start PostgreSQL for development (keeps running)
@echo "PostgreSQL is running on port 5433"
@echo "Connection string: $(TEST_DATABASE_URL)"
# --- Docker Compose Targets ---
.PHONY: test-db-up
test-db-up: ## Start the PostgreSQL test database in the background
@echo "Starting PostgreSQL test database..."
docker compose up -d $(DB_SERVICE_NAME)
.PHONY: test-db-wait
test-db-wait: ## Wait for the PostgreSQL database to be healthy
@echo "Waiting for PostgreSQL database..."
@while ! docker inspect --format='{{.State.Health.Status}}' $(DB_CONTAINER_NAME) 2>/dev/null | grep -q 'healthy'; do \
echo -n "."; \
sleep 1; \
done; \
echo " Database is healthy!"
.PHONY: test-db-down
test-db-down: ## Stop and remove the test database
@echo "Stopping PostgreSQL test database..."
docker compose down
# --- SQLx Targets ---
.PHONY: test-db-migrate
test-db-migrate: ## Run database migrations against PostgreSQL
@echo "Running PostgreSQL migrations..."
DATABASE_URL="$(TEST_DATABASE_URL)" sqlx migrate run --source sql_migrations
.PHONY: test-db-prepare
test-db-prepare: ## Run sqlx prepare for compile-time query verification
@echo "Running sqlx prepare for PostgreSQL..."
DATABASE_URL="$(TEST_DATABASE_URL)" cargo sqlx prepare
# --- Build and Test Targets ---
.PHONY: test-db-run
test-db-run: ## Run tests with PostgreSQL feature
@echo "Running tests with PostgreSQL..."
DATABASE_URL="$(TEST_DATABASE_URL)" cargo test --features pg --no-default-features
.PHONY: build-pg
build-pg: ## Build with PostgreSQL feature
@echo "Building with PostgreSQL feature..."
cargo build
.PHONY: check-pg
check-pg: ## Check code with PostgreSQL feature
@echo "Checking code with PostgreSQL feature..."
cargo check
.PHONY: clippy
clippy: clippy-pg
.PHONY: clippy-pg
clippy-pg: ## Run clippy with PostgreSQL feature
@echo "Running clippy with PostgreSQL feature..."
cargo clippy -- -D warnings
# --- Cleanup Targets ---
.PHONY: clean
clean: ## Clean build artifacts and SQLx cache
cargo clean
rm -rf .sqlx
.PHONY: clean-db
clean-db: test-db-down ## Stop database and clean volumes
docker volume rm -f nym-node-status-api_postgres_test_data 2>/dev/null || true
# --- Utility Targets ---
.PHONY: sqlx-cli
sqlx-cli: ## Install sqlx-cli if not already installed
@command -v sqlx >/dev/null 2>&1 || cargo install sqlx-cli --features postgres
.PHONY: psql
psql: ## Connect to the running PostgreSQL database with psql
@docker exec -it $(DB_CONTAINER_NAME) psql -U testuser -d nyxd_scraper_test
.PHONY: help
help: ## Show help for Makefile targets
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'
+79
View File
@@ -0,0 +1,79 @@
## Quick Start with PostgreSQL
### 1. Install Prerequisites
```bash
# Install sqlx-cli if not already installed
make sqlx-cli
```
### 2. Prepare PostgreSQL for Development
```bash
# This will:
# - Start PostgreSQL in Docker
# - Run migrations
# - Generate SQLx offline query cache
# - Stop the database
make prepare-pg
```
### 3. Build with PostgreSQL
```bash
# Build with PostgreSQL feature
make build-pg
# Or manually:
cargo build
```
### 4. Run with PostgreSQL
```bash
# Start PostgreSQL for development (keeps running)
make dev-db
# In another terminal, run the application
DATABASE_URL=postgres://testuser:testpass@localhost:5433/nym_node_status_api_test \
cargo run
```
## Makefile Targets
```bash
make help # Show all available targets
make prepare-pg # Setup PostgreSQL and prepare SQLx cache
make dev-db # Start PostgreSQL for development
make test-db # Run tests with PostgreSQL
make build-pg # Build with PostgreSQL
make psql # Connect to running PostgreSQL
make clean # Clean build artifacts
make clean-db # Stop database and clean volumes
```
## Environment Variables
See `.env.example` for all configuration options. Key variable:
```bash
# For PostgreSQL:
DATABASE_URL=postgres://testuser:testpass@localhost:5433/nym_node_status_api_test
```
## Troubleshooting
### SQLx Offline Mode
If you see "no cached data for this query" errors:
1. Ensure PostgreSQL is running: `make dev-db`
2. Run: `make test-db-prepare`
### Connection Refused
If you see "Connection refused" errors:
1. Check Docker is running: `docker ps`
2. Check PostgreSQL container: `docker ps | grep nym_node_status_api_postgres_test`
3. Restart database: `make test-db-down && make dev-db`
+8
View File
@@ -0,0 +1,8 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
fn main() {
if let Ok(database_url) = std::env::var("DATABASE_URL") {
println!("cargo::rustc-env=DATABASE_URL={database_url}");
}
}
@@ -0,0 +1,21 @@
services:
postgres-test:
image: postgres:16-alpine
container_name: nyxd_scraper_psql_test
environment:
POSTGRES_DB: nyxd_scraper_test
POSTGRES_USER: testuser
POSTGRES_PASSWORD: testpass
ports:
- '5433:5432' # Map to 5433 to avoid conflicts with default PostgreSQL
healthcheck:
test: [ 'CMD-SHELL', 'pg_isready -U testuser -d nyxd_scraper_test' ]
interval: 5s
timeout: 5s
retries: 5
# Optional: Add volume for persistent data during development
# volumes:
# - postgres_test_data:/var/lib/postgresql/data
# volumes:
# postgres_test_data:
@@ -0,0 +1,127 @@
CREATE TABLE validator
(
consensus_address TEXT NOT NULL PRIMARY KEY, /* Validator consensus address */
consensus_pubkey TEXT NOT NULL UNIQUE /* Validator consensus public key */
);
CREATE TABLE pre_commit
(
validator_address TEXT NOT NULL REFERENCES validator (consensus_address),
height BIGINT NOT NULL,
timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL,
voting_power BIGINT NOT NULL,
proposer_priority BIGINT NOT NULL,
UNIQUE (validator_address, timestamp)
);
CREATE INDEX pre_commit_validator_address_index ON pre_commit (validator_address);
CREATE INDEX pre_commit_height_index ON pre_commit (height);
CREATE TABLE block
(
height BIGINT UNIQUE PRIMARY KEY,
hash TEXT NOT NULL UNIQUE,
num_txs INTEGER DEFAULT 0,
total_gas BIGINT DEFAULT 0,
proposer_address TEXT REFERENCES validator (consensus_address),
timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL
);
CREATE INDEX block_height_index ON block (height);
CREATE INDEX block_hash_index ON block (hash);
CREATE INDEX block_proposer_address_index ON block (proposer_address);
ALTER TABLE block
SET (
autovacuum_vacuum_scale_factor = 0,
autovacuum_analyze_scale_factor = 0,
autovacuum_vacuum_threshold = 10000,
autovacuum_analyze_threshold = 10000
);
CREATE TABLE transaction
(
hash TEXT NOT NULL,
height BIGINT NOT NULL REFERENCES block (height),
"index" INTEGER NOT NULL, -- <<<=== not present in original bdjuno table, but it's quite useful
success BOOLEAN NOT NULL,
/* Body */
messages JSON NOT NULL DEFAULT '[]'::JSON,
memo TEXT,
signatures TEXT[] NOT NULL,
/* AuthInfo */
signer_infos JSONB NOT NULL DEFAULT '[]'::JSONB,
fee JSONB NOT NULL DEFAULT '{}'::JSONB,
/* Tx response */
gas_wanted BIGINT DEFAULT 0,
gas_used BIGINT DEFAULT 0,
raw_log TEXT,
logs JSONB,
CONSTRAINT unique_tx UNIQUE (hash)
);
CREATE INDEX transaction_hash_index ON transaction (hash);
CREATE INDEX transaction_height_index ON transaction (height);
CREATE TABLE message_type
(
type TEXT NOT NULL UNIQUE,
module TEXT NOT NULL,
label TEXT NOT NULL,
height BIGINT NOT NULL
);
CREATE INDEX message_type_module_index ON message_type (module);
CREATE INDEX message_type_type_index ON message_type (type);
CREATE TABLE message
(
transaction_hash TEXT NOT NULL,
index BIGINT NOT NULL,
type TEXT NOT NULL REFERENCES message_type (type),
value JSON NOT NULL,
involved_accounts_addresses TEXT[] NOT NULL,
height BIGINT NOT NULL,
FOREIGN KEY (transaction_hash) REFERENCES transaction (hash),
CONSTRAINT unique_message_per_tx UNIQUE (transaction_hash, index)
);
CREATE INDEX message_transaction_hash_index ON message (transaction_hash);
CREATE INDEX message_type_index ON message (type);
CREATE INDEX message_involved_accounts_index ON message USING GIN (involved_accounts_addresses);
/**
* This function is used to find all the utils that involve any of the given addresses and have
* type that is one of the specified types.
*/
CREATE FUNCTION messages_by_address(
addresses TEXT[],
types TEXT[],
"limit" BIGINT = 100,
"offset" BIGINT = 0)
RETURNS SETOF message AS
$$
SELECT *
FROM message
WHERE (cardinality(types) = 0 OR type = ANY (types))
AND addresses && involved_accounts_addresses
ORDER BY height DESC
LIMIT "limit" OFFSET "offset"
$$ LANGUAGE sql STABLE;
CREATE FUNCTION messages_by_type(
types text[],
"limit" bigint DEFAULT 100,
"offset" bigint DEFAULT 0)
RETURNS SETOF message AS
$$
SELECT *
FROM message
WHERE (cardinality(types) = 0 OR type = ANY (types))
ORDER BY height DESC
LIMIT "limit" OFFSET "offset"
$$ LANGUAGE sql STABLE;
CREATE TABLE pruning
(
last_pruned_height BIGINT NOT NULL
);
+43
View File
@@ -0,0 +1,43 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nyxd_scraper_shared::helpers::MalformedDataError;
use nyxd_scraper_shared::storage::NyxdScraperStorageError;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum PostgresScraperError {
#[error("experienced internal database error: {0}")]
InternalDatabaseError(#[from] sqlx::error::Error),
#[error("failed to perform startup SQL migration: {0}")]
StartupMigrationFailure(#[from] sqlx::migrate::MigrateError),
#[error("failed to begin storage tx: {source}")]
StorageTxBeginFailure {
#[source]
source: sqlx::error::Error,
},
#[error("failed to commit storage tx: {source}")]
StorageTxCommitFailure {
#[source]
source: sqlx::error::Error,
},
#[error(transparent)]
MalformedData(#[from] MalformedDataError),
// TOOD: add struct name
#[error("json serialisation failure: {source}")]
SerialisationFailure {
#[from]
source: serde_json::Error,
},
}
impl From<PostgresScraperError> for NyxdScraperStorageError {
fn from(err: PostgresScraperError) -> Self {
NyxdScraperStorageError::new(err)
}
}
+21
View File
@@ -0,0 +1,21 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::storage::block_storage::PostgresScraperStorage;
use nyxd_scraper_shared::NyxdScraper;
pub use nyxd_scraper_shared::constants;
pub use nyxd_scraper_shared::error::ScraperError;
pub use nyxd_scraper_shared::{
BlockModule, MsgModule, NyxdScraperTransaction, ParsedTransactionResponse, PruningOptions,
PruningStrategy, StartingBlockOpts, TxModule,
};
pub use storage::models;
pub mod error;
pub mod storage;
pub type PostgresNyxdScraper = NyxdScraper<PostgresScraperStorage>;
// TODO: for now just use exactly the same config
pub use nyxd_scraper_shared::Config;
@@ -0,0 +1,246 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::PostgresScraperError;
use crate::models::{CommitSignature, Validator};
use crate::storage::manager::{
prune_blocks, prune_messages, prune_pre_commits, prune_transactions, update_last_pruned,
StorageManager,
};
use crate::storage::transaction::PostgresStorageTransaction;
use async_trait::async_trait;
use nyxd_scraper_shared::storage::helpers::log_db_operation_time;
use nyxd_scraper_shared::storage::{NyxdScraperStorage, NyxdScraperStorageError};
use nyxd_scraper_shared::{default_message_registry, MessageRegistry};
use sqlx::types::time::{OffsetDateTime, PrimitiveDateTime};
use tokio::time::Instant;
use tracing::{debug, error, info, instrument};
#[derive(Clone)]
pub struct PostgresScraperStorage {
pub(crate) manager: StorageManager,
// kinda like very limited cosmos sdk codec
pub(crate) message_registry: MessageRegistry,
}
impl PostgresScraperStorage {
#[instrument]
pub async fn init(connection_string: &str) -> Result<Self, PostgresScraperError> {
debug!("initialising scraper database with '{connection_string}'",);
let connection_pool = match sqlx::PgPool::connect(connection_string).await {
Ok(db) => db,
Err(err) => {
error!("Failed to connect to SQLx database: {err}");
return Err(err.into());
}
};
if let Err(err) = sqlx::migrate!("./sql_migrations")
.run(&connection_pool)
.await
{
error!("Failed to initialize SQLx database: {err}");
return Err(err.into());
}
info!("Database migration finished!");
let manager = StorageManager { connection_pool };
manager.set_initial_metadata().await?;
let storage = PostgresScraperStorage {
manager,
message_registry: default_message_registry(),
};
Ok(storage)
}
#[instrument(skip(self))]
pub async fn prune_storage(
&self,
oldest_to_keep: u32,
current_height: u32,
) -> Result<(), PostgresScraperError> {
let start = Instant::now();
let mut tx = self.begin_processing_tx().await?;
prune_messages(oldest_to_keep.into(), &mut **tx).await?;
prune_transactions(oldest_to_keep.into(), &mut **tx).await?;
prune_pre_commits(oldest_to_keep.into(), &mut **tx).await?;
prune_blocks(oldest_to_keep.into(), &mut **tx).await?;
update_last_pruned(current_height.into(), &mut **tx).await?;
let commit_start = Instant::now();
tx.inner
.commit()
.await
.map_err(|source| PostgresScraperError::StorageTxCommitFailure { source })?;
log_db_operation_time("committing pruning tx", commit_start);
log_db_operation_time("pruning storage", start);
Ok(())
}
#[instrument(skip_all)]
pub async fn begin_processing_tx(
&self,
) -> Result<PostgresStorageTransaction, PostgresScraperError> {
debug!("starting storage tx");
self.manager
.connection_pool
.begin()
.await
.map(|inner| PostgresStorageTransaction {
inner,
registry: self.message_registry.clone(),
})
.map_err(|source| PostgresScraperError::StorageTxBeginFailure { source })
}
pub async fn lowest_block_height(&self) -> Result<Option<i64>, PostgresScraperError> {
Ok(self.manager.get_lowest_block().await?)
}
pub async fn get_first_block_height_after(
&self,
time: OffsetDateTime,
) -> Result<Option<i64>, PostgresScraperError> {
let time = PrimitiveDateTime::new(time.date(), time.time());
Ok(self.manager.get_first_block_height_after(time).await?)
}
pub async fn get_last_block_height_before(
&self,
time: OffsetDateTime,
) -> Result<Option<i64>, PostgresScraperError> {
let time = PrimitiveDateTime::new(time.date(), time.time());
Ok(self.manager.get_last_block_height_before(time).await?)
}
pub async fn get_blocks_between(
&self,
start_time: OffsetDateTime,
end_time: OffsetDateTime,
) -> Result<i64, PostgresScraperError> {
let Some(block_start) = self.get_first_block_height_after(start_time).await? else {
return Ok(0);
};
let Some(block_end) = self.get_last_block_height_before(end_time).await? else {
return Ok(0);
};
Ok(block_end - block_start)
}
pub async fn get_signed_between(
&self,
consensus_address: &str,
start_height: i64,
end_height: i64,
) -> Result<i64, PostgresScraperError> {
Ok(self
.manager
.get_signed_between(consensus_address, start_height, end_height)
.await?)
}
pub async fn get_signed_between_times(
&self,
consensus_address: &str,
start_time: OffsetDateTime,
end_time: OffsetDateTime,
) -> Result<i64, PostgresScraperError> {
let Some(block_start) = self.get_first_block_height_after(start_time).await? else {
return Ok(0);
};
let Some(block_end) = self.get_last_block_height_before(end_time).await? else {
return Ok(0);
};
self.get_signed_between(consensus_address, block_start, block_end)
.await
}
pub async fn get_precommit(
&self,
consensus_address: &str,
height: i64,
) -> Result<Option<CommitSignature>, PostgresScraperError> {
Ok(self
.manager
.get_precommit(consensus_address, height)
.await?)
}
pub async fn get_block_signers(
&self,
height: i64,
) -> Result<Vec<Validator>, PostgresScraperError> {
Ok(self.manager.get_block_validators(height).await?)
}
pub async fn get_all_known_validators(&self) -> Result<Vec<Validator>, PostgresScraperError> {
Ok(self.manager.get_validators().await?)
}
pub async fn get_last_processed_height(&self) -> Result<i64, PostgresScraperError> {
Ok(self.manager.get_last_processed_height().await?)
}
pub async fn get_pruned_height(&self) -> Result<i64, PostgresScraperError> {
Ok(self.manager.get_pruned_height().await?)
}
}
#[async_trait]
impl NyxdScraperStorage for PostgresScraperStorage {
type StorageTransaction = PostgresStorageTransaction;
async fn initialise(storage: &str) -> Result<Self, NyxdScraperStorageError> {
PostgresScraperStorage::init(storage)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn begin_processing_tx(
&self,
) -> Result<Self::StorageTransaction, NyxdScraperStorageError> {
self.begin_processing_tx()
.await
.map_err(NyxdScraperStorageError::from)
}
async fn get_last_processed_height(&self) -> Result<i64, NyxdScraperStorageError> {
self.get_last_processed_height()
.await
.map_err(NyxdScraperStorageError::from)
}
async fn get_pruned_height(&self) -> Result<i64, NyxdScraperStorageError> {
self.get_pruned_height()
.await
.map_err(NyxdScraperStorageError::from)
}
async fn lowest_block_height(&self) -> Result<Option<i64>, NyxdScraperStorageError> {
self.lowest_block_height()
.await
.map_err(NyxdScraperStorageError::from)
}
async fn prune_storage(
&self,
oldest_to_keep: u32,
current_height: u32,
) -> Result<(), NyxdScraperStorageError> {
self.prune_storage(oldest_to_keep, current_height)
.await
.map_err(NyxdScraperStorageError::from)
}
}
@@ -0,0 +1,25 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use cosmrs::AccountId;
use itertools::Itertools;
use nyxd_scraper_shared::ParsedTransactionResponse;
use std::str::FromStr;
// replicate behaviour of `CosmosMessageAddressesParser` from juno
pub(crate) fn parse_addresses_from_events(tx: &ParsedTransactionResponse) -> Vec<String> {
let mut addresses: Vec<String> = Vec::new();
for event in &tx.tx_result.events {
for attribute in &event.attributes {
let Ok(value) = attribute.value_str() else {
continue;
};
// Try parsing the address as an account address
if let Ok(address) = AccountId::from_str(value) {
addresses.push(address.to_string());
}
}
}
addresses.into_iter().unique().collect()
}
@@ -0,0 +1,538 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::storage::models::{CommitSignature, Validator};
use nyxd_scraper_shared::storage::helpers::log_db_operation_time;
use sqlx::types::time::PrimitiveDateTime;
use sqlx::types::{Json, JsonValue};
use sqlx::{Executor, Postgres};
use tokio::time::Instant;
use tracing::{instrument, trace};
#[derive(Clone)]
pub(crate) struct StorageManager {
pub(crate) connection_pool: sqlx::Pool<Postgres>,
}
impl StorageManager {
pub(crate) async fn set_initial_metadata(&self) -> Result<(), sqlx::Error> {
if sqlx::query("SELECT * from metadata")
.fetch_optional(&self.connection_pool)
.await?
.is_none()
{
sqlx::query("INSERT INTO metadata (id, last_processed_height) VALUES (0, 0)")
.execute(&self.connection_pool)
.await?;
}
Ok(())
}
pub(crate) async fn get_lowest_block(&self) -> Result<Option<i64>, sqlx::Error> {
trace!("get_lowest_block");
let start = Instant::now();
let maybe_record = sqlx::query!(
r#"
SELECT height
FROM block
ORDER BY height ASC
LIMIT 1
"#,
)
.fetch_optional(&self.connection_pool)
.await?;
log_db_operation_time("get_lowest_block", start);
Ok(maybe_record.map(|x| x.height))
}
pub(crate) async fn get_first_block_height_after(
&self,
time: PrimitiveDateTime,
) -> Result<Option<i64>, sqlx::Error> {
trace!("get_first_block_height_after");
let start = Instant::now();
let maybe_record = sqlx::query!(
r#"
SELECT height
FROM block
WHERE timestamp > $1
ORDER BY timestamp
LIMIT 1
"#,
time
)
.fetch_optional(&self.connection_pool)
.await?;
log_db_operation_time("get_first_block_height_after", start);
Ok(maybe_record.map(|x| x.height))
}
pub(crate) async fn get_last_block_height_before(
&self,
time: PrimitiveDateTime,
) -> Result<Option<i64>, sqlx::Error> {
trace!("get_last_block_height_before");
let start = Instant::now();
let maybe_record = sqlx::query!(
r#"
SELECT height
FROM block
WHERE timestamp < $1
ORDER BY timestamp DESC
LIMIT 1
"#,
time
)
.fetch_optional(&self.connection_pool)
.await?;
log_db_operation_time("get_last_block_height_before", start);
Ok(maybe_record.map(|x| x.height))
}
pub(crate) async fn get_signed_between(
&self,
consensus_address: &str,
start_height: i64,
end_height: i64,
) -> Result<i64, sqlx::Error> {
trace!("get_signed_between");
let start = Instant::now();
let count = sqlx::query!(
r#"
SELECT COUNT(*) as count FROM pre_commit
WHERE
validator_address = $1
AND height >= $2
AND height <= $3
"#,
consensus_address,
start_height,
end_height
)
.fetch_one(&self.connection_pool)
.await?
.count;
log_db_operation_time("get_signed_between", start);
Ok(count.unwrap_or(0))
}
pub(crate) async fn get_precommit(
&self,
consensus_address: &str,
height: i64,
) -> Result<Option<CommitSignature>, sqlx::Error> {
trace!("get_precommit");
let start = Instant::now();
let res = sqlx::query_as(
r#"
SELECT * FROM pre_commit
WHERE validator_address = $1
AND height = $2
"#,
)
.bind(consensus_address)
.bind(height)
.fetch_optional(&self.connection_pool)
.await?;
log_db_operation_time("get_precommit", start);
Ok(res)
}
pub(crate) async fn get_block_validators(
&self,
height: i64,
) -> Result<Vec<Validator>, sqlx::Error> {
trace!("get_block_validators");
let start = Instant::now();
let res = sqlx::query_as!(
Validator,
r#"
SELECT * FROM validator
WHERE EXISTS (
SELECT 1 FROM pre_commit
WHERE height = $1
AND pre_commit.validator_address = validator.consensus_address
)
"#,
height
)
.fetch_all(&self.connection_pool)
.await?;
log_db_operation_time("get_block_validators", start);
Ok(res)
}
pub(crate) async fn get_validators(&self) -> Result<Vec<Validator>, sqlx::Error> {
trace!("get_validators");
let start = Instant::now();
let res = sqlx::query_as("SELECT * FROM validator")
.fetch_all(&self.connection_pool)
.await?;
log_db_operation_time("get_validators", start);
Ok(res)
}
pub(crate) async fn get_last_processed_height(&self) -> Result<i64, sqlx::Error> {
trace!("get_last_processed_height");
let start = Instant::now();
let maybe_record = sqlx::query!(
r#"
SELECT last_processed_height FROM metadata
"#
)
.fetch_optional(&self.connection_pool)
.await?;
log_db_operation_time("get_last_processed_height", start);
if let Some(row) = maybe_record {
Ok(row.last_processed_height as i64)
} else {
Ok(-1)
}
}
pub(crate) async fn get_pruned_height(&self) -> Result<i64, sqlx::Error> {
trace!("get_pruned_height");
let start = Instant::now();
let maybe_record = sqlx::query!(
r#"
SELECT last_pruned_height FROM pruning
"#
)
.fetch_optional(&self.connection_pool)
.await?;
log_db_operation_time("get_pruned_height", start);
if let Some(row) = maybe_record {
Ok(row.last_pruned_height)
} else {
Ok(-1)
}
}
}
// make those generic over executor so that they could be performed over connection pool and a tx
#[instrument(skip(executor))]
pub(crate) async fn insert_validator<'a, E>(
consensus_address: String,
consensus_pubkey: String,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("insert_validator");
let start = Instant::now();
sqlx::query!(
r#"
INSERT INTO validator (consensus_address, consensus_pubkey)
VALUES ($1, $2)
ON CONFLICT DO NOTHING
"#,
consensus_address,
consensus_pubkey
)
.execute(executor)
.await?;
log_db_operation_time("insert_validator", start);
Ok(())
}
#[instrument(skip(executor))]
pub(crate) async fn insert_block<'a, E>(
height: i64,
hash: String,
num_txs: i32,
total_gas: i64,
proposer_address: String,
timestamp: PrimitiveDateTime,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("insert_block");
let start = Instant::now();
sqlx::query!(
r#"
INSERT INTO block (height, hash, num_txs, total_gas, proposer_address, timestamp)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT DO NOTHING
"#,
height,
hash,
num_txs,
total_gas,
proposer_address,
timestamp
)
.execute(executor)
.await?;
log_db_operation_time("insert_block", start);
Ok(())
}
#[instrument(skip(executor))]
pub(crate) async fn insert_precommit<'a, E>(
validator_address: String,
height: i64,
timestamp: PrimitiveDateTime,
voting_power: i64,
proposer_priority: i64,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("insert_precommit");
let start = Instant::now();
sqlx::query!(
r#"
INSERT INTO pre_commit (validator_address, height, timestamp, voting_power, proposer_priority)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (validator_address, timestamp) DO NOTHING
"#,
validator_address,
height,
timestamp,
voting_power,
proposer_priority
)
.execute(executor)
.await?;
log_db_operation_time("insert_precommit", start);
Ok(())
}
#[instrument(skip(executor))]
#[allow(clippy::too_many_arguments)]
pub(crate) async fn insert_transaction<'a, E>(
hash: String,
height: i64,
index: i32,
success: bool,
messages: JsonValue,
memo: String,
signatures: Vec<String>,
signer_infos: JsonValue,
fee: JsonValue,
gas_wanted: i64,
gas_used: i64,
raw_log: String,
logs: JsonValue,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("insert_transaction");
let start = Instant::now();
sqlx::query!(
r#"
INSERT INTO transaction
(hash, height, index, success, messages, memo, signatures, signer_infos, fee, gas_wanted, gas_used, raw_log, logs)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
ON CONFLICT (hash) DO UPDATE
SET height = excluded.height,
index = excluded.index,
success = excluded.success,
messages = excluded.messages,
memo = excluded.memo,
signatures = excluded.signatures,
signer_infos = excluded.signer_infos,
fee = excluded.fee,
gas_wanted = excluded.gas_wanted,
gas_used = excluded.gas_used,
raw_log = excluded.raw_log,
logs = excluded.logs
"#,
hash,
height,
index,
success,
messages,
memo,
&signatures,
signer_infos,
fee,
gas_wanted,
gas_used,
raw_log,
logs,
)
.execute(executor)
.await?;
log_db_operation_time("insert_transaction", start);
Ok(())
}
#[instrument(skip(executor))]
pub(crate) async fn insert_message<'a, E>(
transaction_hash: String,
index: i64,
typ: String,
value: JsonValue,
involved_account_addresses: Vec<String>,
height: i64,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("insert_message");
let start = Instant::now();
sqlx::query!(
r#"
INSERT INTO message(transaction_hash, index, type, value, involved_accounts_addresses, height)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (transaction_hash, index) DO UPDATE
SET height = excluded.height,
type = excluded.type,
value = excluded.value,
involved_accounts_addresses = excluded.involved_accounts_addresses
"#,
transaction_hash,
index,
typ,
value,
&involved_account_addresses,
height
)
.execute(executor)
.await?;
log_db_operation_time("insert_message", start);
Ok(())
}
#[instrument(skip(executor))]
pub(crate) async fn update_last_processed<'a, E>(
height: i32,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("update_last_processed");
let start = Instant::now();
sqlx::query!(
"UPDATE metadata SET last_processed_height = GREATEST(last_processed_height, $1)",
height
)
.execute(executor)
.await?;
log_db_operation_time("update_last_processed", start);
Ok(())
}
#[instrument(skip(executor))]
pub(crate) async fn update_last_pruned<'a, E>(height: i64, executor: E) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("update_last_pruned");
let start = Instant::now();
sqlx::query!("UPDATE pruning SET last_pruned_height = $1", height)
.execute(executor)
.await?;
log_db_operation_time("update_last_pruned", start);
Ok(())
}
pub(crate) async fn prune_blocks<'a, E>(oldest_to_keep: i64, executor: E) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("prune_blocks");
let start = Instant::now();
sqlx::query!("DELETE FROM block WHERE height < $1", oldest_to_keep)
.execute(executor)
.await?;
log_db_operation_time("prune_blocks", start);
Ok(())
}
pub(crate) async fn prune_pre_commits<'a, E>(
oldest_to_keep: i64,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("prune_pre_commits");
let start = Instant::now();
sqlx::query!("DELETE FROM pre_commit WHERE height < $1", oldest_to_keep)
.execute(executor)
.await?;
log_db_operation_time("prune_pre_commits", start);
Ok(())
}
pub(crate) async fn prune_transactions<'a, E>(
oldest_to_keep: i64,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("prune_transactions");
let start = Instant::now();
sqlx::query!("DELETE FROM transaction WHERE height < $1", oldest_to_keep)
.execute(executor)
.await?;
log_db_operation_time("prune_transactions", start);
Ok(())
}
pub(crate) async fn prune_messages<'a, E>(
oldest_to_keep: i64,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("prune_messages");
let start = Instant::now();
sqlx::query!("DELETE FROM message WHERE height < $1", oldest_to_keep)
.execute(executor)
.await?;
log_db_operation_time("prune_messages", start);
Ok(())
}
@@ -0,0 +1,8 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod block_storage;
mod helpers;
mod manager;
pub mod models;
pub mod transaction;
@@ -0,0 +1,291 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::PostgresScraperError;
use crate::storage::helpers::{parse_addresses_from_events, PlaceholderStruct};
use crate::storage::manager::{
insert_block, insert_message, insert_precommit, insert_transaction, insert_validator,
};
use async_trait::async_trait;
use base64::engine::general_purpose;
use base64::Engine as _;
use cosmrs::proto;
use nyxd_scraper_shared::helpers::{
validator_consensus_address, validator_info, validator_pubkey_to_bech32,
};
use nyxd_scraper_shared::storage::validators::Response;
use nyxd_scraper_shared::storage::{
validators, Block, Commit, CommitSig, NyxdScraperStorageError, NyxdScraperTransaction,
};
use nyxd_scraper_shared::{Any, MessageRegistry, ParsedTransactionResponse};
use serde_json::json;
use sqlx::types::time::{OffsetDateTime, PrimitiveDateTime};
use sqlx::{Postgres, Transaction};
use std::ops::{Deref, DerefMut};
use tracing::{debug, trace, warn};
pub struct PostgresStorageTransaction {
pub(super) inner: Transaction<'static, Postgres>,
pub(super) registry: MessageRegistry,
}
impl Deref for PostgresStorageTransaction {
type Target = Transaction<'static, Postgres>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl DerefMut for PostgresStorageTransaction {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl PostgresStorageTransaction {
fn decode_or_skip(&self, msg: &Any) -> Option<serde_json::Value> {
match self.registry.try_decode(msg) {
Ok(decoded) => Some(decoded),
Err(err) => {
warn!("{err}");
None
}
}
}
async fn persist_validators(
&mut self,
validators: &validators::Response,
) -> Result<(), PostgresScraperError> {
debug!("persisting {} validators", validators.total);
for validator in &validators.validators {
let consensus_address = validator_consensus_address(validator.address)?;
let consensus_pubkey = validator_pubkey_to_bech32(validator.pub_key)?;
insert_validator(
consensus_address.to_string(),
consensus_pubkey.to_string(),
self.inner.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_block_data(
&mut self,
block: &Block,
total_gas: i64,
) -> Result<(), PostgresScraperError> {
let proposer_address =
validator_consensus_address(block.header.proposer_address)?.to_string();
let offset_datetime: OffsetDateTime = block.header.time.into();
let time = PrimitiveDateTime::new(offset_datetime.date(), offset_datetime.time());
insert_block(
block.header.height.into(),
block.header.hash().to_string(),
block.data.len() as i32,
total_gas,
proposer_address,
time,
self.inner.as_mut(),
)
.await?;
Ok(())
}
async fn persist_commits(
&mut self,
commits: &Commit,
validators: &validators::Response,
) -> Result<(), PostgresScraperError> {
debug!("persisting up to {} commits", commits.signatures.len());
let height: i64 = commits.height.into();
for commit_sig in &commits.signatures {
let (validator_id, timestamp, signature) = match commit_sig {
CommitSig::BlockIdFlagAbsent => {
trace!("absent signature");
continue;
}
CommitSig::BlockIdFlagCommit {
validator_address,
timestamp,
signature,
} => (validator_address, timestamp, signature),
CommitSig::BlockIdFlagNil {
validator_address,
timestamp,
signature,
} => (validator_address, timestamp, signature),
};
let validator = validator_info(*validator_id, validators)?;
let validator_address = validator_consensus_address(*validator_id)?;
if signature.is_none() {
warn!("empty signature for {validator_address} at height {height}");
continue;
}
let offset_datetime: OffsetDateTime = (*timestamp).into();
let time = PrimitiveDateTime::new(offset_datetime.date(), offset_datetime.time());
insert_precommit(
validator_address.to_string(),
height,
time,
validator.power.into(),
validator.proposer_priority.value(),
self.inner.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_txs(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), PostgresScraperError> {
debug!("persisting {} txs", txs.len());
for chain_tx in txs {
// bdjuno style, base64 encode them
let signatures = chain_tx
.tx
.signatures
.iter()
.map(|sig| general_purpose::STANDARD.encode(sig))
.collect();
let messages = chain_tx
.tx
.body
.messages
.iter()
.filter_map(|msg| self.decode_or_skip(msg))
.collect::<Vec<_>>();
let signer_infos = chain_tx
.tx
.auth_info
.signer_infos
.iter()
.map(|info| proto::cosmos::tx::v1beta1::SignerInfo::from(info.clone()))
.collect::<Vec<_>>();
insert_transaction(
chain_tx.hash.to_string(),
chain_tx.height.into(),
chain_tx.index as i32,
chain_tx.tx_result.code.is_ok(),
serde_json::Value::Array(messages),
chain_tx.tx.body.memo.clone(),
signatures,
serde_json::to_value(signer_infos)?,
serde_json::to_value(&chain_tx.tx.auth_info.fee)?,
chain_tx.tx_result.gas_wanted,
chain_tx.tx_result.gas_used,
chain_tx.tx_result.log.clone(),
json!("null"),
self.inner.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_messages(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), PostgresScraperError> {
debug!("persisting messages");
for chain_tx in txs {
let involved_addresses = parse_addresses_from_events(chain_tx);
for (index, msg) in chain_tx.tx.body.messages.iter().enumerate() {
insert_message(
chain_tx.hash.to_string(),
index as i64,
msg.type_url.clone(),
serde_json::to_value(self.decode_or_skip(msg))?,
involved_addresses.clone(),
chain_tx.height.into(),
self.inner.as_mut(),
)
.await?
}
}
Ok(())
}
}
#[async_trait]
impl NyxdScraperTransaction for PostgresStorageTransaction {
async fn commit(self) -> Result<(), NyxdScraperStorageError> {
self.inner
.commit()
.await
.map_err(PostgresScraperError::from)
.map_err(NyxdScraperStorageError::from)
}
async fn persist_validators(
&mut self,
validators: &Response,
) -> Result<(), NyxdScraperStorageError> {
self.persist_validators(validators)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn persist_block_data(
&mut self,
block: &Block,
total_gas: i64,
) -> Result<(), NyxdScraperStorageError> {
self.persist_block_data(block, total_gas)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn persist_commits(
&mut self,
commits: &Commit,
validators: &Response,
) -> Result<(), NyxdScraperStorageError> {
self.persist_commits(commits, validators)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn persist_txs(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), NyxdScraperStorageError> {
self.persist_txs(txs)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn persist_messages(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), NyxdScraperStorageError> {
self.persist_messages(txs)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn update_last_processed(&mut self, height: i64) -> Result<(), NyxdScraperStorageError> {
self.update_last_processed(height).await
}
}
@@ -1,5 +1,5 @@
[package]
name = "nyxd-scraper"
name = "nyxd-scraper-shared"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
@@ -7,19 +7,23 @@ homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
rust-version.workspace = true
readme.workspace = true
[dependencies]
async-trait.workspace = true
base64.workspace = true
const_format = { workspace = true }
cosmrs.workspace = true
cosmos-sdk-proto = { workspace = true, features = ["serde", "cosmwasm"] } # we need to explicitly include serde feature
eyre = { workspace = true }
futures.workspace = true
humantime = { workspace = true }
ibc-proto = { workspace = true, features = ["serde"] }
prost = { workspace = true }
sha2 = { workspace = true }
serde = { workspace = true, features = ["derive"] }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate", "time"] }
serde_json = { workspace = true }
tendermint.workspace = true
tendermint-rpc = { workspace = true, features = ["websocket-client", "http-client"] }
thiserror.workspace = true
@@ -31,10 +35,5 @@ tracing.workspace = true
url.workspace = true
# TEMP
#nym-bin-common = { path = "../bin-common", features = ["basic_tracing"]}
[build-dependencies]
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
[lints]
workspace = true
@@ -7,7 +7,7 @@ use crate::block_requester::BlockRequest;
use crate::error::ScraperError;
use crate::modules::{BlockModule, MsgModule, TxModule};
use crate::rpc_client::RpcClient;
use crate::storage::{persist_block, ScraperStorage};
use crate::storage::{persist_block, NyxdScraperStorage, NyxdScraperTransaction};
use crate::PruningOptions;
use futures::StreamExt;
use std::cmp::max;
@@ -77,7 +77,7 @@ impl BlockProcessorConfig {
}
}
pub struct BlockProcessor {
pub struct BlockProcessor<S> {
config: BlockProcessorConfig,
cancel: CancellationToken,
synced: Arc<Notify>,
@@ -90,7 +90,7 @@ pub struct BlockProcessor {
rpc_client: RpcClient,
incoming: UnboundedReceiverStream<BlockToProcess>,
block_requester: Sender<BlockRequest>,
storage: ScraperStorage,
storage: S,
// future work: rather than sending each msg to every msg module,
// let them subscribe based on `type_url` inside the message itself
@@ -101,14 +101,17 @@ pub struct BlockProcessor {
}
#[allow(clippy::too_many_arguments)]
impl BlockProcessor {
impl<S> BlockProcessor<S>
where
S: NyxdScraperStorage,
{
pub async fn new(
config: BlockProcessorConfig,
cancel: CancellationToken,
synced: Arc<Notify>,
incoming: UnboundedReceiver<BlockToProcess>,
block_requester: Sender<BlockRequest>,
storage: ScraperStorage,
storage: S,
rpc_client: RpcClient,
) -> Result<Self, ScraperError> {
let last_processed = storage.get_last_processed_height().await?;
@@ -164,7 +167,11 @@ impl BlockProcessor {
// process the entire block as a transaction so that if anything fails,
// we won't end up with a corrupted storage.
let mut tx = self.storage.begin_processing_tx().await?;
let mut tx = self
.storage
.begin_processing_tx()
.await
.map_err(ScraperError::tx_begin_failure)?;
persist_block(&full_info, &mut tx, self.config.store_precommits).await?;
@@ -192,10 +199,8 @@ impl BlockProcessor {
}
let commit_start = Instant::now();
tx.commit()
.await
.map_err(|source| ScraperError::StorageTxCommitFailure { source })?;
crate::storage::log_db_operation_time("committing processing tx", commit_start);
tx.commit().await.map_err(ScraperError::tx_commit_failure)?;
crate::storage::helpers::log_db_operation_time("committing processing tx", commit_start);
self.last_processed_height = full_info.block.header.height.value() as u32;
self.last_processed_at = Instant::now();
@@ -0,0 +1,146 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::modules::auth::Auth;
use crate::cosmos_module::modules::authz::Authz;
use crate::cosmos_module::modules::bank::Bank;
use crate::cosmos_module::modules::capability::Capability;
use crate::cosmos_module::modules::consensus::Consensus;
use crate::cosmos_module::modules::crisis::Crisis;
use crate::cosmos_module::modules::distribution::Distribution;
use crate::cosmos_module::modules::evidence::Evidence;
use crate::cosmos_module::modules::feegrant::Feegrant;
use crate::cosmos_module::modules::gov_v1::GovV1;
use crate::cosmos_module::modules::gov_v1beta1::GovV1Beta1;
use crate::cosmos_module::modules::group::Group;
use crate::cosmos_module::modules::ibc_core::IbcCore;
use crate::cosmos_module::modules::ibc_fee::IbcFee;
use crate::cosmos_module::modules::ibc_interchain_accounts_controller::IbcInterchainAccountsController;
use crate::cosmos_module::modules::ibc_transfer_v1::IbcTransferV1;
use crate::cosmos_module::modules::ibc_transfer_v2::IbcTransferV2;
use crate::cosmos_module::modules::mint::Mint;
use crate::cosmos_module::modules::nft::Nft;
use crate::cosmos_module::modules::params::Params;
use crate::cosmos_module::modules::slashing::Slashing;
use crate::cosmos_module::modules::staking::Staking;
use crate::cosmos_module::modules::upgrade::Upgrade;
use crate::cosmos_module::modules::vesting::Vesting;
use crate::cosmos_module::modules::wasm::Wasm;
use crate::cosmos_module::CosmosModule;
use crate::error::ScraperError;
use cosmrs::proto::prost::Name;
use cosmrs::proto::traits::Message;
use cosmrs::Any;
use serde::Serialize;
use std::collections::HashMap;
pub(crate) fn default_proto_to_json<T: Message + Default + Serialize>(
msg: &Any,
) -> Result<serde_json::Value, ScraperError> {
let proto = <T as Message>::decode(msg.value.as_slice()).map_err(|error| {
ScraperError::InvalidProtoRepresentation {
type_url: msg.type_url.clone(),
error,
}
})?;
let mut base_serde =
serde_json::to_value(&proto).map_err(|error| ScraperError::JsonSerialisationFailure {
type_url: msg.type_url.clone(),
error,
})?;
// in bdjuno's output we also had @type field with the type_url
let obj = base_serde.as_object_mut().ok_or_else(|| {
ScraperError::JsonSerialisationFailureNotObject {
type_url: msg.type_url.clone(),
}
})?;
obj.insert(
"@type".to_string(),
serde_json::Value::String(msg.type_url.clone()),
);
Ok(base_serde)
}
type ConvertFn = fn(&Any) -> Result<serde_json::Value, ScraperError>;
#[derive(Default, Clone)]
pub struct MessageRegistry {
// type url to function converting bytes to proto and finally to json
registered_types: HashMap<String, ConvertFn>,
}
impl MessageRegistry {
pub fn new() -> Self {
MessageRegistry {
registered_types: Default::default(),
}
}
pub fn register<T>(&mut self)
where
T: Message + Default + Name + Serialize + 'static,
{
self.register_with_custom_fn::<T>(default_proto_to_json::<T>)
}
#[allow(clippy::panic)]
pub fn register_with_custom_fn<T>(&mut self, convert_fn: ConvertFn)
where
T: Message + Default + Name + Serialize + 'static,
{
if self
.registered_types
.insert(<T as Name>::type_url(), convert_fn)
.is_some()
{
// don't allow duplicate registration because it most likely implies bug in the code
panic!("duplicate registration of type {}", <T as Name>::type_url());
}
}
pub fn try_decode(&self, raw: &Any) -> Result<serde_json::Value, ScraperError> {
self.registered_types.get(&raw.type_url).ok_or(
ScraperError::MissingTypeUrlRegistration {
type_url: raw.type_url.clone(),
},
)?(raw)
}
}
pub fn default_message_registry() -> MessageRegistry {
let mut registry = MessageRegistry::new();
let modules: Vec<Box<dyn CosmosModule>> = vec![
Box::new(Auth),
Box::new(Authz),
Box::new(Bank),
Box::new(Capability),
Box::new(Consensus),
Box::new(Wasm),
Box::new(Crisis),
Box::new(Distribution),
Box::new(Evidence),
Box::new(Feegrant),
Box::new(GovV1),
Box::new(GovV1Beta1),
Box::new(Group),
Box::new(IbcCore),
Box::new(IbcFee),
Box::new(IbcTransferV1),
Box::new(IbcTransferV2),
Box::new(IbcInterchainAccountsController),
Box::new(Mint),
Box::new(Nft),
Box::new(Params),
Box::new(Slashing),
Box::new(Staking),
Box::new(Upgrade),
Box::new(Vesting),
];
for module in modules {
module.register_messages(&mut registry)
}
registry
}
@@ -0,0 +1,11 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::message_registry::MessageRegistry;
pub mod message_registry;
mod modules;
pub trait CosmosModule {
fn register_messages(&self, registry: &mut MessageRegistry);
}
@@ -0,0 +1,14 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::message_registry::MessageRegistry;
use crate::cosmos_module::CosmosModule;
use cosmos_sdk_proto::cosmos::auth::v1beta1::MsgUpdateParams;
pub(crate) struct Auth;
impl CosmosModule for Auth {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgUpdateParams>()
}
}
@@ -0,0 +1,16 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::message_registry::MessageRegistry;
use crate::cosmos_module::CosmosModule;
use cosmos_sdk_proto::cosmos::authz::v1beta1::{MsgExec, MsgGrant, MsgRevoke};
pub(crate) struct Authz;
impl CosmosModule for Authz {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgGrant>();
registry.register::<MsgExec>();
registry.register::<MsgRevoke>();
}
}
@@ -0,0 +1,19 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::message_registry::MessageRegistry;
use crate::cosmos_module::CosmosModule;
use cosmos_sdk_proto::cosmos::bank::v1beta1::{
MsgMultiSend, MsgSend, MsgSetSendEnabled, MsgUpdateParams,
};
pub(crate) struct Bank;
impl CosmosModule for Bank {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgSend>();
registry.register::<MsgMultiSend>();
registry.register::<MsgUpdateParams>();
registry.register::<MsgSetSendEnabled>();
}
}
@@ -0,0 +1,11 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::message_registry::MessageRegistry;
use crate::cosmos_module::CosmosModule;
pub(crate) struct Capability;
impl CosmosModule for Capability {
fn register_messages(&self, _registry: &mut MessageRegistry) {}
}
@@ -0,0 +1,11 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::message_registry::MessageRegistry;
use crate::cosmos_module::CosmosModule;
pub(crate) struct Consensus;
impl CosmosModule for Consensus {
fn register_messages(&self, _registry: &mut MessageRegistry) {}
}
@@ -0,0 +1,15 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::message_registry::MessageRegistry;
use crate::cosmos_module::CosmosModule;
use cosmos_sdk_proto::cosmos::crisis::v1beta1::{MsgUpdateParams, MsgVerifyInvariant};
pub(crate) struct Crisis;
impl CosmosModule for Crisis {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgVerifyInvariant>();
registry.register::<MsgUpdateParams>();
}
}
@@ -0,0 +1,22 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::message_registry::MessageRegistry;
use crate::cosmos_module::CosmosModule;
use cosmos_sdk_proto::cosmos::distribution::v1beta1::{
MsgCommunityPoolSpend, MsgFundCommunityPool, MsgSetWithdrawAddress, MsgUpdateParams,
MsgWithdrawDelegatorReward, MsgWithdrawValidatorCommission,
};
pub(crate) struct Distribution;
impl CosmosModule for Distribution {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgWithdrawDelegatorReward>();
registry.register::<MsgWithdrawValidatorCommission>();
registry.register::<MsgSetWithdrawAddress>();
registry.register::<MsgFundCommunityPool>();
registry.register::<MsgUpdateParams>();
registry.register::<MsgCommunityPoolSpend>();
}
}
@@ -0,0 +1,14 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::message_registry::MessageRegistry;
use crate::cosmos_module::CosmosModule;
use cosmos_sdk_proto::cosmos::evidence::v1beta1::MsgSubmitEvidence;
pub(crate) struct Evidence;
impl CosmosModule for Evidence {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgSubmitEvidence>()
}
}
@@ -0,0 +1,18 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::message_registry::MessageRegistry;
use crate::cosmos_module::CosmosModule;
use cosmos_sdk_proto::cosmos::feegrant::v1beta1::{
MsgGrantAllowance, MsgPruneAllowances, MsgRevokeAllowance,
};
pub(crate) struct Feegrant;
impl CosmosModule for Feegrant {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgGrantAllowance>();
registry.register::<MsgRevokeAllowance>();
registry.register::<MsgPruneAllowances>();
}
}
@@ -0,0 +1,21 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::message_registry::MessageRegistry;
use crate::cosmos_module::CosmosModule;
use cosmos_sdk_proto::cosmos::gov::v1::{
MsgDeposit, MsgExecLegacyContent, MsgSubmitProposal, MsgUpdateParams, MsgVote, MsgVoteWeighted,
};
pub(crate) struct GovV1;
impl CosmosModule for GovV1 {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgSubmitProposal>();
registry.register::<MsgDeposit>();
registry.register::<MsgVote>();
registry.register::<MsgVoteWeighted>();
registry.register::<MsgExecLegacyContent>();
registry.register::<MsgUpdateParams>();
}
}
@@ -0,0 +1,19 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::message_registry::MessageRegistry;
use crate::cosmos_module::CosmosModule;
use cosmos_sdk_proto::cosmos::gov::v1beta1::{
MsgDeposit, MsgSubmitProposal, MsgVote, MsgVoteWeighted,
};
pub(crate) struct GovV1Beta1;
impl CosmosModule for GovV1Beta1 {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgSubmitProposal>();
registry.register::<MsgDeposit>();
registry.register::<MsgVote>();
registry.register::<MsgVoteWeighted>();
}
}
@@ -0,0 +1,27 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::message_registry::MessageRegistry;
use crate::cosmos_module::CosmosModule;
use tracing::warn;
pub(crate) struct Group;
impl CosmosModule for Group {
fn register_messages(&self, _registry: &mut MessageRegistry) {
warn!("mising cosmos-sdk-proto definition for 'group::MsgCreateGroup'");
warn!("mising cosmos-sdk-proto definition for 'group::MsgUpdateGroupMembers'");
warn!("mising cosmos-sdk-proto definition for 'group::MsgUpdateGroupAdmin'");
warn!("mising cosmos-sdk-proto definition for 'group::MsgUpdateGroupMetadata'");
warn!("mising cosmos-sdk-proto definition for 'group::MsgCreateGroupWithPolicy'");
warn!("mising cosmos-sdk-proto definition for 'group::MsgCreateGroupPolicy'");
warn!("mising cosmos-sdk-proto definition for 'group::MsgUpdateGroupPolicyAdmin'");
warn!("mising cosmos-sdk-proto definition for 'group::MsgUpdateGroupPolicyDecisionPolicy'");
warn!("mising cosmos-sdk-proto definition for 'group::MsgUpdateGroupPolicyMetadata'");
warn!("mising cosmos-sdk-proto definition for 'group::MsgSubmitProposal'");
warn!("mising cosmos-sdk-proto definition for 'group::MsgWithdrawProposal'");
warn!("mising cosmos-sdk-proto definition for 'group::MsgVote'");
warn!("mising cosmos-sdk-proto definition for 'group::MsgExec'");
warn!("mising cosmos-sdk-proto definition for 'group::MsgLeaveGroup'");
}
}
@@ -0,0 +1,70 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::{CosmosModule, MessageRegistry};
use ibc_proto::ibc::core::channel::{
self,
v1::{
MsgAcknowledgement, MsgChannelCloseConfirm, MsgChannelCloseInit, MsgChannelOpenAck,
MsgChannelOpenConfirm, MsgChannelOpenInit, MsgChannelOpenTry, MsgChannelUpgradeAck,
MsgChannelUpgradeCancel, MsgChannelUpgradeConfirm, MsgChannelUpgradeInit,
MsgChannelUpgradeOpen, MsgChannelUpgradeTimeout, MsgChannelUpgradeTry,
MsgPruneAcknowledgements, MsgRecvPacket, MsgTimeout, MsgTimeoutOnClose,
},
};
use ibc_proto::ibc::core::client::{
self,
v1::{
MsgCreateClient, MsgIbcSoftwareUpgrade, MsgRecoverClient, MsgSubmitMisbehaviour,
MsgUpdateClient, MsgUpgradeClient,
},
};
use ibc_proto::ibc::core::connection::{
self,
v1::{
MsgConnectionOpenAck, MsgConnectionOpenConfirm, MsgConnectionOpenInit, MsgConnectionOpenTry,
},
};
pub(crate) struct IbcCore;
impl CosmosModule for IbcCore {
fn register_messages(&self, registry: &mut MessageRegistry) {
// channel
registry.register::<MsgChannelOpenInit>();
registry.register::<MsgChannelOpenTry>();
registry.register::<MsgChannelOpenAck>();
registry.register::<MsgChannelOpenConfirm>();
registry.register::<MsgChannelCloseInit>();
registry.register::<MsgChannelCloseConfirm>();
registry.register::<MsgRecvPacket>();
registry.register::<MsgTimeout>();
registry.register::<MsgTimeoutOnClose>();
registry.register::<MsgAcknowledgement>();
registry.register::<MsgChannelUpgradeInit>();
registry.register::<MsgChannelUpgradeTry>();
registry.register::<MsgChannelUpgradeAck>();
registry.register::<MsgChannelUpgradeConfirm>();
registry.register::<MsgChannelUpgradeOpen>();
registry.register::<MsgChannelUpgradeTimeout>();
registry.register::<MsgChannelUpgradeCancel>();
registry.register::<channel::v1::MsgUpdateParams>();
registry.register::<MsgPruneAcknowledgements>();
// client
registry.register::<MsgCreateClient>();
registry.register::<MsgUpdateClient>();
registry.register::<MsgUpgradeClient>();
registry.register::<MsgSubmitMisbehaviour>();
registry.register::<MsgRecoverClient>();
registry.register::<MsgIbcSoftwareUpgrade>();
registry.register::<client::v1::MsgUpdateParams>();
// connection
registry.register::<MsgConnectionOpenInit>();
registry.register::<MsgConnectionOpenTry>();
registry.register::<MsgConnectionOpenAck>();
registry.register::<MsgConnectionOpenConfirm>();
registry.register::<connection::v1::MsgUpdateParams>();
}
}
@@ -0,0 +1,18 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::{CosmosModule, MessageRegistry};
use ibc_proto::ibc::applications::fee::v1::{
MsgPayPacketFee, MsgPayPacketFeeAsync, MsgRegisterPayee, RegisteredCounterpartyPayee,
};
pub(crate) struct IbcFee;
impl CosmosModule for IbcFee {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgRegisterPayee>();
registry.register::<RegisteredCounterpartyPayee>();
registry.register::<MsgPayPacketFee>();
registry.register::<MsgPayPacketFeeAsync>();
}
}
@@ -0,0 +1,17 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::{CosmosModule, MessageRegistry};
use ibc_proto::ibc::applications::interchain_accounts::controller::v1::{
MsgRegisterInterchainAccount, MsgSendTx, MsgUpdateParams,
};
pub(crate) struct IbcInterchainAccountsController;
impl CosmosModule for IbcInterchainAccountsController {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgRegisterInterchainAccount>();
registry.register::<MsgSendTx>();
registry.register::<MsgUpdateParams>();
}
}
@@ -0,0 +1,14 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::{CosmosModule, MessageRegistry};
use ibc_proto::ibc::applications::transfer::v1::{MsgTransfer, MsgUpdateParams};
pub(crate) struct IbcTransferV1;
impl CosmosModule for IbcTransferV1 {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgTransfer>();
registry.register::<MsgUpdateParams>();
}
}
@@ -0,0 +1,10 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::{CosmosModule, MessageRegistry};
pub(crate) struct IbcTransferV2;
impl CosmosModule for IbcTransferV2 {
fn register_messages(&self, _registry: &mut MessageRegistry) {}
}
@@ -0,0 +1,14 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::message_registry::MessageRegistry;
use crate::cosmos_module::CosmosModule;
use cosmos_sdk_proto::cosmos::mint::v1beta1::MsgUpdateParams;
pub(crate) struct Mint;
impl CosmosModule for Mint {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgUpdateParams>()
}
}
@@ -0,0 +1,28 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub(crate) mod auth;
pub(crate) mod authz;
pub(crate) mod bank;
pub(crate) mod capability;
pub(crate) mod consensus;
pub(crate) mod crisis;
pub(crate) mod distribution;
pub(crate) mod evidence;
pub(crate) mod feegrant;
pub(crate) mod gov_v1;
pub(crate) mod gov_v1beta1;
pub(crate) mod group;
pub(crate) mod ibc_core;
pub(crate) mod ibc_fee;
pub(crate) mod ibc_interchain_accounts_controller;
pub(crate) mod ibc_transfer_v1;
pub(crate) mod ibc_transfer_v2;
pub(crate) mod mint;
pub(crate) mod nft;
pub(crate) mod params;
pub(crate) mod slashing;
pub(crate) mod staking;
pub(crate) mod upgrade;
pub(crate) mod vesting;
pub(crate) mod wasm;
@@ -0,0 +1,11 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::message_registry::MessageRegistry;
use crate::cosmos_module::CosmosModule;
pub(crate) struct Nft;
impl CosmosModule for Nft {
fn register_messages(&self, _registry: &mut MessageRegistry) {}
}
@@ -0,0 +1,11 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::message_registry::MessageRegistry;
use crate::cosmos_module::CosmosModule;
pub(crate) struct Params;
impl CosmosModule for Params {
fn register_messages(&self, _registry: &mut MessageRegistry) {}
}
@@ -0,0 +1,11 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::message_registry::MessageRegistry;
use crate::cosmos_module::CosmosModule;
pub(crate) struct Slashing;
impl CosmosModule for Slashing {
fn register_messages(&self, _registry: &mut MessageRegistry) {}
}
@@ -0,0 +1,23 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::message_registry::MessageRegistry;
use crate::cosmos_module::CosmosModule;
use cosmos_sdk_proto::cosmos::staking::v1beta1::{
MsgBeginRedelegate, MsgCancelUnbondingDelegation, MsgCreateValidator, MsgDelegate,
MsgEditValidator, MsgUndelegate, MsgUpdateParams,
};
pub(crate) struct Staking;
impl CosmosModule for Staking {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgCreateValidator>();
registry.register::<MsgEditValidator>();
registry.register::<MsgDelegate>();
registry.register::<MsgUndelegate>();
registry.register::<MsgBeginRedelegate>();
registry.register::<MsgCancelUnbondingDelegation>();
registry.register::<MsgUpdateParams>();
}
}
@@ -0,0 +1,15 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::message_registry::MessageRegistry;
use crate::cosmos_module::CosmosModule;
use cosmos_sdk_proto::cosmos::upgrade::v1beta1::{MsgCancelUpgrade, MsgSoftwareUpgrade};
pub(crate) struct Upgrade;
impl CosmosModule for Upgrade {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgSoftwareUpgrade>();
registry.register::<MsgCancelUpgrade>();
}
}
@@ -0,0 +1,18 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::message_registry::MessageRegistry;
use crate::cosmos_module::CosmosModule;
use cosmos_sdk_proto::cosmos::vesting::v1beta1::{
MsgCreatePeriodicVestingAccount, MsgCreatePermanentLockedAccount, MsgCreateVestingAccount,
};
pub(crate) struct Vesting;
impl CosmosModule for Vesting {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgCreateVestingAccount>();
registry.register::<MsgCreatePermanentLockedAccount>();
registry.register::<MsgCreatePeriodicVestingAccount>();
}
}
@@ -0,0 +1,104 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::message_registry::{default_proto_to_json, MessageRegistry};
use crate::cosmos_module::CosmosModule;
use crate::error::ScraperError;
use base64::engine::general_purpose::STANDARD;
use base64::Engine;
use cosmos_sdk_proto::cosmwasm::wasm::v1::{
MsgAddCodeUploadParamsAddresses, MsgClearAdmin, MsgExecuteContract, MsgIbcCloseChannel,
MsgIbcSend, MsgInstantiateContract, MsgInstantiateContract2, MsgMigrateContract, MsgPinCodes,
MsgRemoveCodeUploadParamsAddresses, MsgStoreAndInstantiateContract, MsgStoreAndMigrateContract,
MsgStoreCode, MsgSudoContract, MsgUnpinCodes, MsgUpdateAdmin, MsgUpdateContractLabel,
MsgUpdateInstantiateConfig, MsgUpdateParams,
};
use cosmrs::Any;
use prost::Message;
use serde::Serialize;
use tracing::warn;
pub(crate) struct Wasm;
fn decode_wasm_message<T: Message + Default + Serialize>(
msg: &Any,
) -> Result<serde_json::Value, ScraperError> {
let field = "msg";
// 1. perform basic decoding
let mut base = default_proto_to_json::<T>(msg)?;
let Some(encoded_field) = base.get_mut(field) else {
warn!(
"missing field 'msg' in wasm message of type {} - can't perform additional decoding",
msg.type_url
);
return Ok(base);
};
// 2. decode 'msg' field
let as_str =
encoded_field
.as_str()
.ok_or(ScraperError::JsonWasmSerialisationFailureNotString {
field: field.to_string(),
type_url: msg.type_url.clone(),
})?;
let decoded = STANDARD.decode(as_str).map_err(|error| {
ScraperError::JsonWasmSerialisationFailureInvalidBase64Encoding {
field: field.to_string(),
type_url: msg.type_url.clone(),
error,
}
})?;
// 3. replace original 'msg' with the new json
let re_decoded: serde_json::Value = serde_json::from_slice(&decoded).map_err(|error| {
ScraperError::JsonSerialisationFailure {
type_url: format!("{}.{field}", msg.type_url),
error,
}
})?;
*encoded_field = re_decoded;
Ok(base)
}
impl CosmosModule for Wasm {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgIbcSend>();
registry.register::<MsgIbcCloseChannel>();
registry.register::<MsgStoreCode>();
registry.register_with_custom_fn::<MsgInstantiateContract>(|msg| {
decode_wasm_message::<MsgInstantiateContract>(msg)
});
registry.register_with_custom_fn::<MsgInstantiateContract2>(|msg| {
decode_wasm_message::<MsgInstantiateContract2>(msg)
});
registry.register_with_custom_fn::<MsgExecuteContract>(|msg| {
decode_wasm_message::<MsgExecuteContract>(msg)
});
registry.register_with_custom_fn::<MsgMigrateContract>(|msg| {
decode_wasm_message::<MsgMigrateContract>(msg)
});
registry.register_with_custom_fn::<MsgSudoContract>(|msg| {
decode_wasm_message::<MsgSudoContract>(msg)
});
registry.register_with_custom_fn::<MsgStoreAndInstantiateContract>(|msg| {
decode_wasm_message::<MsgStoreAndInstantiateContract>(msg)
});
registry.register_with_custom_fn::<MsgStoreAndMigrateContract>(|msg| {
decode_wasm_message::<MsgStoreAndMigrateContract>(msg)
});
registry.register::<MsgUpdateAdmin>();
registry.register::<MsgClearAdmin>();
registry.register::<MsgUpdateInstantiateConfig>();
registry.register::<MsgUpdateParams>();
registry.register::<MsgPinCodes>();
registry.register::<MsgUnpinCodes>();
registry.register::<MsgAddCodeUploadParamsAddresses>();
registry.register::<MsgRemoveCodeUploadParamsAddresses>();
registry.register::<MsgUpdateContractLabel>();
}
}
@@ -4,17 +4,16 @@
use crate::block_processor::pruning::{
EVERYTHING_PRUNING_INTERVAL, EVERYTHING_PRUNING_KEEP_RECENT,
};
use crate::helpers::MalformedDataError;
use crate::storage::NyxdScraperStorageError;
use tendermint::Hash;
use thiserror::Error;
use tokio::sync::mpsc::error::SendError;
#[derive(Debug, Error)]
pub enum ScraperError {
#[error("experienced internal database error: {0}")]
InternalDatabaseError(#[from] sqlx::Error),
#[error("failed to perform startup SQL migration: {0}")]
StartupMigrationFailure(#[from] sqlx::migrate::MigrateError),
#[error("storage error: {0}")]
StorageError(#[from] NyxdScraperStorageError),
#[error("the block scraper is already running")]
ScraperAlreadyRunning,
@@ -104,40 +103,26 @@ pub enum ScraperError {
#[error("failed to begin storage tx: {source}")]
StorageTxBeginFailure {
#[source]
source: sqlx::Error,
source: NyxdScraperStorageError,
},
#[error("failed to commit storage tx: {source}")]
StorageTxCommitFailure {
#[source]
source: sqlx::Error,
source: NyxdScraperStorageError,
},
#[error("failed to send on a closed channel")]
ClosedChannelError,
#[error("failed to parse validator's address: {source}")]
MalformedValidatorAddress {
#[source]
source: eyre::Report,
},
#[error("failed to parse validator's address: {source}")]
MalformedValidatorPubkey {
#[source]
source: eyre::Report,
},
#[error(transparent)]
MalformedData(#[from] MalformedDataError),
#[error(
"could not find the block proposer ('{proposer}') for height {height} in the validator set"
)]
BlockProposerNotInValidatorSet { height: u32, proposer: String },
#[error(
"could not find validator information for {address}; the validator has signed a commit"
)]
MissingValidatorInfoCommitted { address: String },
#[error("pruning.interval must not be set to 0. If you want to disable pruning, select pruning.strategy = \"nothing\"")]
ZeroPruningInterval,
@@ -146,6 +131,49 @@ pub enum ScraperError {
#[error("pruning.keep_recent must not be smaller than {}. got: {keep_recent}. for most aggressive pruning, select pruning.strategy = \"everything\"", EVERYTHING_PRUNING_KEEP_RECENT)]
TooSmallKeepRecent { keep_recent: u32 },
#[error("'{type_url}' is not registered in the message registry")]
MissingTypeUrlRegistration { type_url: String },
#[error("failed to decode message of type '{type_url}': {error}")]
InvalidProtoRepresentation {
type_url: String,
#[source]
error: prost::DecodeError,
},
#[error("failed to encode message of type '{type_url}' to json: '{error}'")]
JsonSerialisationFailure {
type_url: String,
#[source]
error: serde_json::Error,
},
#[error("serialisation of message of type '{type_url}' didn't result in an object!")]
JsonSerialisationFailureNotObject { type_url: String },
#[error("field '{field}' in '{type_url}' is not a string")]
JsonWasmSerialisationFailureNotString { field: String, type_url: String },
#[error("field '{field}' in '{type_url}' has invalid base64 encoding: {error}")]
JsonWasmSerialisationFailureInvalidBase64Encoding {
field: String,
type_url: String,
#[source]
error: base64::DecodeError,
},
}
impl ScraperError {
pub fn tx_begin_failure(source: NyxdScraperStorageError) -> ScraperError
where {
ScraperError::StorageTxBeginFailure { source }
}
pub fn tx_commit_failure(source: NyxdScraperStorageError) -> ScraperError
where {
ScraperError::StorageTxCommitFailure { source }
}
}
impl<T> From<SendError<T>> for ScraperError {
+66
View File
@@ -0,0 +1,66 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::block_processor::types::ParsedTransactionResponse;
use crate::constants::{BECH32_CONESNSUS_PUBKEY_PREFIX, BECH32_CONSENSUS_ADDRESS_PREFIX};
use cosmrs::AccountId;
use sha2::{Digest, Sha256};
use tendermint::{account, PublicKey};
use tendermint::{validator, Hash};
use tendermint_rpc::endpoint::validators;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum MalformedDataError {
#[error("failed to parse validator's address: {source}")]
MalformedValidatorAddress {
#[source]
source: eyre::Report,
},
#[error("failed to parse validator's address: {source}")]
MalformedValidatorPubkey {
#[source]
source: eyre::Report,
},
#[error(
"could not find validator information for {address}; the validator has signed a commit"
)]
MissingValidatorInfoCommitted { address: String },
}
pub fn tx_hash<M: AsRef<[u8]>>(raw_tx: M) -> Hash {
Hash::Sha256(Sha256::digest(raw_tx).into())
}
pub fn validator_pubkey_to_bech32(pubkey: PublicKey) -> Result<AccountId, MalformedDataError> {
// TODO: this one seem to attach additional prefix to they pubkeys, is that what we want instead maybe?
// Ok(pubkey.to_bech32(BECH32_CONESNSUS_PUBKEY_PREFIX))
AccountId::new(BECH32_CONESNSUS_PUBKEY_PREFIX, &pubkey.to_bytes())
.map_err(|source| MalformedDataError::MalformedValidatorPubkey { source })
}
pub fn validator_consensus_address(id: account::Id) -> Result<AccountId, MalformedDataError> {
AccountId::new(BECH32_CONSENSUS_ADDRESS_PREFIX, id.as_ref())
.map_err(|source| MalformedDataError::MalformedValidatorAddress { source })
}
pub fn tx_gas_sum(txs: &[ParsedTransactionResponse]) -> i64 {
txs.iter().map(|tx| tx.tx_result.gas_used).sum()
}
pub fn validator_info(
id: account::Id,
validators: &validators::Response,
) -> Result<&validator::Info, MalformedDataError> {
match validators.validators.iter().find(|v| v.address == id) {
Some(info) => Ok(info),
None => {
let addr = validator_consensus_address(id)?;
Err(MalformedDataError::MissingValidatorInfoCommitted {
address: addr.to_string(),
})
}
}
}
@@ -1,14 +1,12 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![warn(clippy::expect_used)]
#![warn(clippy::unwrap_used)]
pub(crate) mod block_processor;
pub(crate) mod block_requester;
pub mod constants;
mod cosmos_module;
pub mod error;
pub(crate) mod helpers;
pub mod helpers;
pub mod modules;
pub(crate) mod rpc_client;
pub(crate) mod scraper;
@@ -16,6 +14,11 @@ pub mod storage;
pub use block_processor::pruning::{PruningOptions, PruningStrategy};
pub use block_processor::types::ParsedTransactionResponse;
pub use cosmos_module::{
message_registry::{default_message_registry, MessageRegistry},
CosmosModule,
};
pub use cosmrs::Any;
pub use modules::{BlockModule, MsgModule, TxModule};
pub use scraper::{Config, NyxdScraper, StartingBlockOpts};
pub use storage::models;
pub use storage::{NyxdScraperStorage, NyxdScraperTransaction};
@@ -3,7 +3,7 @@
use crate::block_processor::types::FullBlockInformation;
use crate::error::ScraperError;
use crate::storage::StorageTransaction;
use crate::storage::NyxdScraperTransaction;
use async_trait::async_trait;
#[async_trait]
@@ -11,6 +11,6 @@ pub trait BlockModule {
async fn handle_block(
&mut self,
block: &FullBlockInformation,
storage_tx: &mut StorageTransaction,
storage_tx: &mut dyn NyxdScraperTransaction,
) -> Result<(), ScraperError>;
}
@@ -3,7 +3,7 @@
use crate::block_processor::types::ParsedTransactionResponse;
use crate::error::ScraperError;
use crate::storage::StorageTransaction;
use crate::storage::NyxdScraperTransaction;
use async_trait::async_trait;
use cosmrs::Any;
@@ -16,6 +16,6 @@ pub trait MsgModule {
index: usize,
msg: &Any,
tx: &ParsedTransactionResponse,
storage_tx: &mut StorageTransaction,
storage_tx: &mut dyn NyxdScraperTransaction,
) -> Result<(), ScraperError>;
}
@@ -3,7 +3,7 @@
use crate::block_processor::types::ParsedTransactionResponse;
use crate::error::ScraperError;
use crate::storage::StorageTransaction;
use crate::storage::NyxdScraperTransaction;
use async_trait::async_trait;
#[async_trait]
@@ -11,6 +11,6 @@ pub trait TxModule {
async fn handle_tx(
&mut self,
tx: &ParsedTransactionResponse,
storage_tx: &mut StorageTransaction,
storage_tx: &mut dyn NyxdScraperTransaction,
) -> Result<(), ScraperError>;
}
@@ -8,10 +8,10 @@ use crate::error::ScraperError;
use crate::modules::{BlockModule, MsgModule, TxModule};
use crate::rpc_client::RpcClient;
use crate::scraper::subscriber::ChainSubscriber;
use crate::storage::ScraperStorage;
use crate::storage::NyxdScraperStorage;
use crate::PruningOptions;
use futures::future::join_all;
use std::path::PathBuf;
use std::marker::PhantomData;
use std::sync::Arc;
use tokio::sync::mpsc::{
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender,
@@ -40,7 +40,8 @@ pub struct Config {
/// Url to the rpc endpoint of a validator, for example `https://rpc.nymtech.net/`
pub rpc_url: Url,
pub database_path: PathBuf,
/// Points to either underlying file (sqlite) or connection string (postgres)
pub database_storage: String,
pub pruning_options: PruningOptions,
@@ -49,7 +50,8 @@ pub struct Config {
pub start_block: StartingBlockOpts,
}
pub struct NyxdScraperBuilder {
pub struct NyxdScraperBuilder<S> {
_storage: PhantomData<S>,
config: Config,
block_modules: Vec<Box<dyn BlockModule + Send>>,
@@ -57,9 +59,13 @@ pub struct NyxdScraperBuilder {
msg_modules: Vec<Box<dyn MsgModule + Send>>,
}
impl NyxdScraperBuilder {
pub async fn build_and_start(self) -> Result<NyxdScraper, ScraperError> {
let scraper = NyxdScraper::new(self.config).await?;
impl<S> NyxdScraperBuilder<S>
where
S: NyxdScraperStorage + Send + Sync + 'static,
S::StorageTransaction: Send + Sync + 'static,
{
pub async fn build_and_start(self) -> Result<NyxdScraper<S>, ScraperError> {
let scraper = NyxdScraper::<S>::new(self.config).await?;
let (processing_tx, processing_rx) = unbounded_channel();
let (req_tx, req_rx) = channel(5);
@@ -110,6 +116,7 @@ impl NyxdScraperBuilder {
pub fn new(config: Config) -> Self {
NyxdScraperBuilder {
_storage: PhantomData,
config,
block_modules: vec![],
tx_modules: vec![],
@@ -133,24 +140,28 @@ impl NyxdScraperBuilder {
}
}
pub struct NyxdScraper {
pub struct NyxdScraper<S> {
config: Config,
task_tracker: TaskTracker,
cancel_token: CancellationToken,
startup_sync: Arc<Notify>,
storage: ScraperStorage,
storage: S,
rpc_client: RpcClient,
}
impl NyxdScraper {
pub fn builder(config: Config) -> NyxdScraperBuilder {
impl<S> NyxdScraper<S>
where
S: NyxdScraperStorage + Send + Sync + 'static,
S::StorageTransaction: Send + Sync + 'static,
{
pub fn builder(config: Config) -> NyxdScraperBuilder<S> {
NyxdScraperBuilder::new(config)
}
pub async fn new(config: Config) -> Result<Self, ScraperError> {
config.pruning_options.validate()?;
let storage = ScraperStorage::init(&config.database_path).await?;
let storage = S::initialise(&config.database_storage).await?;
let rpc_client = RpcClient::new(&config.rpc_url)?;
Ok(NyxdScraper {
@@ -163,14 +174,14 @@ impl NyxdScraper {
})
}
pub fn storage(&self) -> ScraperStorage {
self.storage.clone()
pub fn storage(&self) -> &S {
&self.storage
}
fn start_tasks(
&self,
mut block_requester: BlockRequester,
mut block_processor: BlockProcessor,
mut block_processor: BlockProcessor<S>,
mut chain_subscriber: ChainSubscriber,
) {
self.task_tracker
@@ -336,7 +347,7 @@ impl NyxdScraper {
&self,
req_tx: Sender<BlockRequest>,
processing_rx: UnboundedReceiver<BlockToProcess>,
) -> Result<BlockProcessor, ScraperError> {
) -> Result<BlockProcessor<S>, ScraperError> {
let block_processor_config = BlockProcessorConfig::new(
self.config.pruning_options,
self.config.store_precommits,
@@ -344,7 +355,7 @@ impl NyxdScraper {
self.config.start_block.use_best_effort_start_height,
);
BlockProcessor::new(
BlockProcessor::<S>::new(
block_processor_config,
self.cancel_token.clone(),
self.startup_sync.clone(),
@@ -0,0 +1,18 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use tokio::time::Instant;
use tracing::{debug, error, info, trace, warn};
pub fn log_db_operation_time(op_name: &str, start_time: Instant) {
let elapsed = start_time.elapsed();
let formatted = humantime::format_duration(elapsed);
match elapsed.as_millis() {
v if v > 10000 => error!("{op_name} took {formatted} to execute"),
v if v > 1000 => warn!("{op_name} took {formatted} to execute"),
v if v > 100 => info!("{op_name} took {formatted} to execute"),
v if v > 10 => debug!("{op_name} took {formatted} to execute"),
_ => trace!("{op_name} took {formatted} to execute"),
}
}
@@ -0,0 +1,124 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::ScraperError;
use async_trait::async_trait;
use thiserror::Error;
use tracing::warn;
pub use crate::block_processor::types::FullBlockInformation;
pub use crate::ParsedTransactionResponse;
pub use tendermint::block::{Commit, CommitSig};
pub use tendermint::Block;
pub use tendermint_rpc::endpoint::validators;
pub mod helpers;
// a workaround for needing associated type (which is a no-no in dynamic dispatch)
#[derive(Error, Debug)]
#[error(transparent)]
pub struct NyxdScraperStorageError(Box<dyn std::error::Error + Send + Sync>);
impl NyxdScraperStorageError {
pub fn new<E>(error: E) -> Self
where
E: std::error::Error + Send + Sync + 'static,
{
NyxdScraperStorageError(Box::new(error))
}
}
#[async_trait]
pub trait NyxdScraperStorage: Clone + Sized {
type StorageTransaction: NyxdScraperTransaction;
/// Either connection string (postgres) or storage path (sqlite)
async fn initialise(storage: &str) -> Result<Self, NyxdScraperStorageError>;
async fn begin_processing_tx(
&self,
) -> Result<Self::StorageTransaction, NyxdScraperStorageError>;
async fn get_last_processed_height(&self) -> Result<i64, NyxdScraperStorageError>;
async fn get_pruned_height(&self) -> Result<i64, NyxdScraperStorageError>;
async fn lowest_block_height(&self) -> Result<Option<i64>, NyxdScraperStorageError>;
async fn prune_storage(
&self,
oldest_to_keep: u32,
current_height: u32,
) -> Result<(), NyxdScraperStorageError>;
}
#[async_trait]
pub trait NyxdScraperTransaction {
async fn commit(mut self) -> Result<(), NyxdScraperStorageError>;
async fn persist_validators(
&mut self,
validators: &validators::Response,
) -> Result<(), NyxdScraperStorageError>;
async fn persist_block_data(
&mut self,
block: &Block,
total_gas: i64,
) -> Result<(), NyxdScraperStorageError>;
async fn persist_commits(
&mut self,
commits: &Commit,
validators: &validators::Response,
) -> Result<(), NyxdScraperStorageError>;
async fn persist_txs(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), NyxdScraperStorageError>;
async fn persist_messages(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), NyxdScraperStorageError>;
async fn update_last_processed(&mut self, height: i64) -> Result<(), NyxdScraperStorageError>;
}
pub async fn persist_block<Tx>(
block: &FullBlockInformation,
tx: &mut Tx,
store_precommits: bool,
) -> Result<(), ScraperError>
where
Tx: NyxdScraperTransaction,
{
let total_gas = crate::helpers::tx_gas_sum(&block.transactions);
// SANITY CHECK: make sure the block proposer is present in the validator set
block.ensure_proposer()?;
tx.persist_validators(&block.validators).await?;
tx.persist_block_data(&block.block, total_gas).await?;
if store_precommits {
if let Some(commit) = &block.block.last_commit {
tx.persist_commits(commit, &block.validators).await?;
} else {
warn!("no commits for block {}", block.block.header.height)
}
}
// persist txs
tx.persist_txs(&block.transactions).await?;
// persist messages (inside the transactions)
tx.persist_messages(&block.transactions).await?;
tx.update_last_processed(block.block.header.height.into())
.await?;
Ok(())
}
+28
View File
@@ -0,0 +1,28 @@
[package]
name = "nyxd-scraper-sqlite"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = { workspace = true }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate", "time"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing.workspace = true
nyxd-scraper-shared = { path = "../nyxd-scraper-shared" }
[build-dependencies]
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
[lints]
workspace = true
@@ -0,0 +1,10 @@
/*
* Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
* SPDX-License-Identifier: Apache-2.0
*/
CREATE TABLE METADATA
(
id INTEGER PRIMARY KEY CHECK (id = 0),
last_processed_height INTEGER NOT NULL
);
+36
View File
@@ -0,0 +1,36 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nyxd_scraper_shared::helpers::MalformedDataError;
use nyxd_scraper_shared::storage::NyxdScraperStorageError;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum SqliteScraperError {
#[error("experienced internal database error: {0}")]
InternalDatabaseError(#[from] sqlx::error::Error),
#[error("failed to perform startup SQL migration: {0}")]
StartupMigrationFailure(#[from] sqlx::migrate::MigrateError),
#[error("failed to begin storage tx: {source}")]
StorageTxBeginFailure {
#[source]
source: sqlx::error::Error,
},
#[error("failed to commit storage tx: {source}")]
StorageTxCommitFailure {
#[source]
source: sqlx::error::Error,
},
#[error(transparent)]
MalformedData(#[from] MalformedDataError),
}
impl From<SqliteScraperError> for NyxdScraperStorageError {
fn from(err: SqliteScraperError) -> Self {
NyxdScraperStorageError::new(err)
}
}
+21
View File
@@ -0,0 +1,21 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::storage::block_storage::SqliteScraperStorage;
use nyxd_scraper_shared::NyxdScraper;
pub use nyxd_scraper_shared::constants;
pub use nyxd_scraper_shared::error::ScraperError;
pub use nyxd_scraper_shared::{
BlockModule, MsgModule, NyxdScraperTransaction, ParsedTransactionResponse, PruningOptions,
PruningStrategy, StartingBlockOpts, TxModule,
};
pub use storage::models;
pub mod error;
pub mod storage;
pub type SqliteNyxdScraper = NyxdScraper<SqliteScraperStorage>;
// TODO: for now just use exactly the same config
pub use nyxd_scraper_shared::Config;
@@ -0,0 +1,251 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::SqliteScraperError;
use crate::models::{CommitSignature, Validator};
use crate::storage::manager::{
prune_blocks, prune_messages, prune_pre_commits, prune_transactions, update_last_pruned,
StorageManager,
};
use crate::storage::transaction::SqliteStorageTransaction;
use async_trait::async_trait;
use nyxd_scraper_shared::storage::helpers::log_db_operation_time;
use nyxd_scraper_shared::storage::{NyxdScraperStorage, NyxdScraperStorageError};
use sqlx::sqlite::{SqliteAutoVacuum, SqliteSynchronous};
use sqlx::types::time::OffsetDateTime;
use sqlx::ConnectOptions;
use std::fmt::Debug;
use std::path::Path;
use tokio::time::Instant;
use tracing::{debug, error, info, instrument};
#[derive(Clone)]
pub struct SqliteScraperStorage {
pub(crate) manager: StorageManager,
}
impl SqliteScraperStorage {
#[instrument]
pub async fn init<P: AsRef<Path> + Debug>(
database_path: P,
) -> Result<Self, SqliteScraperError> {
let database_path = database_path.as_ref();
debug!(
"initialising scraper database path to '{}'",
database_path.display()
);
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
// TODO: do we want auto_vacuum ?
let connection_pool = match sqlx::SqlitePool::connect_with(opts).await {
Ok(db) => db,
Err(err) => {
error!("Failed to connect to SQLx database: {err}");
return Err(err.into());
}
};
if let Err(err) = sqlx::migrate!("./sql_migrations")
.run(&connection_pool)
.await
{
error!("Failed to initialize SQLx database: {err}");
return Err(err.into());
}
info!("Database migration finished!");
let manager = StorageManager { connection_pool };
manager.set_initial_metadata().await?;
let storage = SqliteScraperStorage { manager };
Ok(storage)
}
#[instrument(skip(self))]
pub async fn prune_storage(
&self,
oldest_to_keep: u32,
current_height: u32,
) -> Result<(), SqliteScraperError> {
let start = Instant::now();
let mut tx = self.begin_processing_tx().await?;
prune_messages(oldest_to_keep.into(), &mut **tx).await?;
prune_transactions(oldest_to_keep.into(), &mut **tx).await?;
prune_pre_commits(oldest_to_keep.into(), &mut **tx).await?;
prune_blocks(oldest_to_keep.into(), &mut **tx).await?;
update_last_pruned(current_height.into(), &mut **tx).await?;
let commit_start = Instant::now();
tx.0.commit()
.await
.map_err(|source| SqliteScraperError::StorageTxCommitFailure { source })?;
log_db_operation_time("committing pruning tx", commit_start);
log_db_operation_time("pruning storage", start);
Ok(())
}
#[instrument(skip_all)]
pub async fn begin_processing_tx(
&self,
) -> Result<SqliteStorageTransaction, SqliteScraperError> {
debug!("starting storage tx");
self.manager
.connection_pool
.begin()
.await
.map(SqliteStorageTransaction)
.map_err(|source| SqliteScraperError::StorageTxBeginFailure { source })
}
pub async fn lowest_block_height(&self) -> Result<Option<i64>, SqliteScraperError> {
Ok(self.manager.get_lowest_block().await?)
}
pub async fn get_first_block_height_after(
&self,
time: OffsetDateTime,
) -> Result<Option<i64>, SqliteScraperError> {
Ok(self.manager.get_first_block_height_after(time).await?)
}
pub async fn get_last_block_height_before(
&self,
time: OffsetDateTime,
) -> Result<Option<i64>, SqliteScraperError> {
Ok(self.manager.get_last_block_height_before(time).await?)
}
pub async fn get_blocks_between(
&self,
start_time: OffsetDateTime,
end_time: OffsetDateTime,
) -> Result<i64, SqliteScraperError> {
let Some(block_start) = self.get_first_block_height_after(start_time).await? else {
return Ok(0);
};
let Some(block_end) = self.get_last_block_height_before(end_time).await? else {
return Ok(0);
};
Ok(block_end - block_start)
}
pub async fn get_signed_between(
&self,
consensus_address: &str,
start_height: i64,
end_height: i64,
) -> Result<i64, SqliteScraperError> {
Ok(self
.manager
.get_signed_between(consensus_address, start_height, end_height)
.await?)
}
pub async fn get_signed_between_times(
&self,
consensus_address: &str,
start_time: OffsetDateTime,
end_time: OffsetDateTime,
) -> Result<i64, SqliteScraperError> {
let Some(block_start) = self.get_first_block_height_after(start_time).await? else {
return Ok(0);
};
let Some(block_end) = self.get_last_block_height_before(end_time).await? else {
return Ok(0);
};
self.get_signed_between(consensus_address, block_start, block_end)
.await
}
pub async fn get_precommit(
&self,
consensus_address: &str,
height: i64,
) -> Result<Option<CommitSignature>, SqliteScraperError> {
Ok(self
.manager
.get_precommit(consensus_address, height)
.await?)
}
pub async fn get_block_signers(
&self,
height: i64,
) -> Result<Vec<Validator>, SqliteScraperError> {
Ok(self.manager.get_block_validators(height).await?)
}
pub async fn get_all_known_validators(&self) -> Result<Vec<Validator>, SqliteScraperError> {
Ok(self.manager.get_validators().await?)
}
pub async fn get_last_processed_height(&self) -> Result<i64, SqliteScraperError> {
Ok(self.manager.get_last_processed_height().await?)
}
pub async fn get_pruned_height(&self) -> Result<i64, SqliteScraperError> {
Ok(self.manager.get_pruned_height().await?)
}
}
#[async_trait]
impl NyxdScraperStorage for SqliteScraperStorage {
type StorageTransaction = SqliteStorageTransaction;
async fn initialise(storage: &str) -> Result<Self, NyxdScraperStorageError> {
SqliteScraperStorage::init(storage)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn begin_processing_tx(
&self,
) -> Result<Self::StorageTransaction, NyxdScraperStorageError> {
self.begin_processing_tx()
.await
.map_err(NyxdScraperStorageError::from)
}
async fn get_last_processed_height(&self) -> Result<i64, NyxdScraperStorageError> {
self.get_last_processed_height()
.await
.map_err(NyxdScraperStorageError::from)
}
async fn get_pruned_height(&self) -> Result<i64, NyxdScraperStorageError> {
self.get_pruned_height()
.await
.map_err(NyxdScraperStorageError::from)
}
async fn lowest_block_height(&self) -> Result<Option<i64>, NyxdScraperStorageError> {
self.lowest_block_height()
.await
.map_err(NyxdScraperStorageError::from)
}
async fn prune_storage(
&self,
oldest_to_keep: u32,
current_height: u32,
) -> Result<(), NyxdScraperStorageError> {
self.prune_storage(oldest_to_keep, current_height)
.await
.map_err(NyxdScraperStorageError::from)
}
}
@@ -1,8 +1,8 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::storage::log_db_operation_time;
use crate::storage::models::{CommitSignature, Validator};
use nyxd_scraper_shared::storage::helpers::log_db_operation_time;
use sqlx::types::time::OffsetDateTime;
use sqlx::{Executor, Sqlite};
use tokio::time::Instant;
@@ -1,2 +1,7 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod block_storage;
mod manager;
pub mod models;
pub mod transaction;
@@ -0,0 +1,30 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use sqlx::types::time::OffsetDateTime;
use sqlx::FromRow;
#[derive(Debug, Clone, Eq, PartialEq, Hash, FromRow)]
pub struct Validator {
pub consensus_address: String,
pub consensus_pubkey: String,
}
#[derive(Debug, Clone, FromRow)]
pub struct Block {
pub height: i64,
pub hash: String,
pub num_txs: u32,
pub total_gas: i64,
pub proposer_address: String,
pub timestamp: OffsetDateTime,
}
#[derive(Debug, Clone, FromRow)]
pub struct CommitSignature {
pub height: i64,
pub validator_address: String,
pub voting_power: i64,
pub proposer_priority: i64,
pub timestamp: OffsetDateTime,
}
@@ -0,0 +1,236 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::SqliteScraperError;
use crate::storage::manager::{
insert_block, insert_message, insert_precommit, insert_transaction, insert_validator,
};
use async_trait::async_trait;
use nyxd_scraper_shared::helpers::{
validator_consensus_address, validator_info, validator_pubkey_to_bech32,
};
use nyxd_scraper_shared::storage::validators::Response;
use nyxd_scraper_shared::storage::{
validators, Block, Commit, CommitSig, NyxdScraperStorageError, NyxdScraperTransaction,
};
use nyxd_scraper_shared::ParsedTransactionResponse;
use sqlx::{Sqlite, Transaction};
use std::ops::{Deref, DerefMut};
use tracing::{debug, trace, warn};
pub struct SqliteStorageTransaction(pub(crate) Transaction<'static, Sqlite>);
impl Deref for SqliteStorageTransaction {
type Target = Transaction<'static, Sqlite>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for SqliteStorageTransaction {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl SqliteStorageTransaction {
async fn persist_validators(
&mut self,
validators: &validators::Response,
) -> Result<(), SqliteScraperError> {
debug!("persisting {} validators", validators.total);
for validator in &validators.validators {
let consensus_address = validator_consensus_address(validator.address)?;
let consensus_pubkey = validator_pubkey_to_bech32(validator.pub_key)?;
insert_validator(
consensus_address.to_string(),
consensus_pubkey.to_string(),
self.0.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_block_data(
&mut self,
block: &Block,
total_gas: i64,
) -> Result<(), SqliteScraperError> {
let proposer_address =
validator_consensus_address(block.header.proposer_address)?.to_string();
insert_block(
block.header.height.into(),
block.header.hash().to_string(),
block.data.len() as u32,
total_gas,
proposer_address,
block.header.time.into(),
self.0.as_mut(),
)
.await?;
Ok(())
}
async fn persist_commits(
&mut self,
commits: &Commit,
validators: &validators::Response,
) -> Result<(), SqliteScraperError> {
debug!("persisting up to {} commits", commits.signatures.len());
let height: i64 = commits.height.into();
for commit_sig in &commits.signatures {
let (validator_id, timestamp, signature) = match commit_sig {
CommitSig::BlockIdFlagAbsent => {
trace!("absent signature");
continue;
}
CommitSig::BlockIdFlagCommit {
validator_address,
timestamp,
signature,
} => (validator_address, timestamp, signature),
CommitSig::BlockIdFlagNil {
validator_address,
timestamp,
signature,
} => (validator_address, timestamp, signature),
};
let validator = validator_info(*validator_id, validators)?;
let validator_address = validator_consensus_address(*validator_id)?;
if signature.is_none() {
warn!("empty signature for {validator_address} at height {height}");
continue;
}
insert_precommit(
validator_address.to_string(),
height,
(*timestamp).into(),
validator.power.into(),
validator.proposer_priority.value(),
self.0.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_txs(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), SqliteScraperError> {
debug!("persisting {} txs", txs.len());
for chain_tx in txs {
insert_transaction(
chain_tx.hash.to_string(),
chain_tx.height.into(),
chain_tx.index as i64,
chain_tx.tx_result.code.is_ok(),
chain_tx.tx.body.messages.len() as i64,
chain_tx.tx.body.memo.clone(),
chain_tx.tx_result.gas_wanted,
chain_tx.tx_result.gas_used,
chain_tx.tx_result.log.clone(),
self.0.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_messages(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), SqliteScraperError> {
debug!("persisting messages");
for chain_tx in txs {
for (index, msg) in chain_tx.tx.body.messages.iter().enumerate() {
insert_message(
chain_tx.hash.to_string(),
index as i64,
msg.type_url.clone(),
chain_tx.height.into(),
self.0.as_mut(),
)
.await?
}
}
Ok(())
}
}
#[async_trait]
impl NyxdScraperTransaction for SqliteStorageTransaction {
async fn commit(self) -> Result<(), NyxdScraperStorageError> {
self.0
.commit()
.await
.map_err(SqliteScraperError::from)
.map_err(NyxdScraperStorageError::from)
}
async fn persist_validators(
&mut self,
validators: &Response,
) -> Result<(), NyxdScraperStorageError> {
self.persist_validators(validators)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn persist_block_data(
&mut self,
block: &Block,
total_gas: i64,
) -> Result<(), NyxdScraperStorageError> {
self.persist_block_data(block, total_gas)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn persist_commits(
&mut self,
commits: &Commit,
validators: &Response,
) -> Result<(), NyxdScraperStorageError> {
self.persist_commits(commits, validators)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn persist_txs(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), NyxdScraperStorageError> {
self.persist_txs(txs)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn persist_messages(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), NyxdScraperStorageError> {
self.persist_messages(txs)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn update_last_processed(&mut self, height: i64) -> Result<(), NyxdScraperStorageError> {
self.update_last_processed(height)
.await
.map_err(NyxdScraperStorageError::from)
}
}
-46
View File
@@ -1,46 +0,0 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::block_processor::types::ParsedTransactionResponse;
use crate::constants::{BECH32_CONESNSUS_PUBKEY_PREFIX, BECH32_CONSENSUS_ADDRESS_PREFIX};
use crate::error::ScraperError;
use cosmrs::AccountId;
use sha2::{Digest, Sha256};
use tendermint::{account, PublicKey};
use tendermint::{validator, Hash};
use tendermint_rpc::endpoint::validators;
pub(crate) fn tx_hash<M: AsRef<[u8]>>(raw_tx: M) -> Hash {
Hash::Sha256(Sha256::digest(raw_tx).into())
}
pub(crate) fn validator_pubkey_to_bech32(pubkey: PublicKey) -> Result<AccountId, ScraperError> {
// TODO: this one seem to attach additional prefix to they pubkeys, is that what we want instead maybe?
// Ok(pubkey.to_bech32(BECH32_CONESNSUS_PUBKEY_PREFIX))
AccountId::new(BECH32_CONESNSUS_PUBKEY_PREFIX, &pubkey.to_bytes())
.map_err(|source| ScraperError::MalformedValidatorPubkey { source })
}
pub(crate) fn validator_consensus_address(id: account::Id) -> Result<AccountId, ScraperError> {
AccountId::new(BECH32_CONSENSUS_ADDRESS_PREFIX, id.as_ref())
.map_err(|source| ScraperError::MalformedValidatorAddress { source })
}
pub(crate) fn tx_gas_sum(txs: &[ParsedTransactionResponse]) -> i64 {
txs.iter().map(|tx| tx.tx_result.gas_used).sum()
}
pub(crate) fn validator_info(
id: account::Id,
validators: &validators::Response,
) -> Result<&validator::Info, ScraperError> {
match validators.validators.iter().find(|v| v.address == id) {
Some(info) => Ok(info),
None => {
let addr = validator_consensus_address(id)?;
Err(ScraperError::MissingValidatorInfoCommitted {
address: addr.to_string(),
})
}
}
}
-394
View File
@@ -1,394 +0,0 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::{
block_processor::types::{FullBlockInformation, ParsedTransactionResponse},
error::ScraperError,
storage::{
manager::{
insert_block, insert_message, insert_precommit, insert_transaction, insert_validator,
prune_blocks, prune_messages, prune_pre_commits, prune_transactions,
update_last_processed, update_last_pruned, StorageManager,
},
models::{CommitSignature, Validator},
},
};
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
types::time::OffsetDateTime,
ConnectOptions, Sqlite, Transaction,
};
use std::{fmt::Debug, path::Path};
use tendermint::{
block::{Commit, CommitSig},
Block,
};
use tendermint_rpc::endpoint::validators;
use tokio::time::Instant;
use tracing::{debug, error, info, instrument, trace, warn};
mod helpers;
mod manager;
pub mod models;
pub type StorageTransaction = Transaction<'static, Sqlite>;
#[derive(Clone)]
pub struct ScraperStorage {
pub(crate) manager: StorageManager,
}
pub(crate) fn log_db_operation_time(op_name: &str, start_time: Instant) {
let elapsed = start_time.elapsed();
let formatted = humantime::format_duration(elapsed);
match elapsed.as_millis() {
v if v > 10000 => error!("{op_name} took {formatted} to execute"),
v if v > 1000 => warn!("{op_name} took {formatted} to execute"),
v if v > 100 => info!("{op_name} took {formatted} to execute"),
v if v > 10 => debug!("{op_name} took {formatted} to execute"),
_ => trace!("{op_name} took {formatted} to execute"),
}
}
impl ScraperStorage {
#[instrument]
pub async fn init<P: AsRef<Path> + Debug>(database_path: P) -> Result<Self, ScraperError> {
let database_path = database_path.as_ref();
debug!(
"initialising scraper database path to '{}'",
database_path.display()
);
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
// TODO: do we want auto_vacuum ?
let connection_pool = match sqlx::SqlitePool::connect_with(opts).await {
Ok(db) => db,
Err(err) => {
error!("Failed to connect to SQLx database: {err}");
return Err(err.into());
}
};
if let Err(err) = sqlx::migrate!("./sql_migrations")
.run(&connection_pool)
.await
{
error!("Failed to initialize SQLx database: {err}");
return Err(err.into());
}
info!("Database migration finished!");
let manager = StorageManager { connection_pool };
manager.set_initial_metadata().await?;
let storage = ScraperStorage { manager };
Ok(storage)
}
#[instrument(skip(self))]
pub async fn prune_storage(
&self,
oldest_to_keep: u32,
current_height: u32,
) -> Result<(), ScraperError> {
let start = Instant::now();
let mut tx = self.begin_processing_tx().await?;
prune_messages(oldest_to_keep.into(), &mut *tx).await?;
prune_transactions(oldest_to_keep.into(), &mut *tx).await?;
prune_pre_commits(oldest_to_keep.into(), &mut *tx).await?;
prune_blocks(oldest_to_keep.into(), &mut *tx).await?;
update_last_pruned(current_height.into(), &mut *tx).await?;
let commit_start = Instant::now();
tx.commit()
.await
.map_err(|source| ScraperError::StorageTxCommitFailure { source })?;
log_db_operation_time("committing pruning tx", commit_start);
log_db_operation_time("pruning storage", start);
Ok(())
}
#[instrument(skip_all)]
pub async fn begin_processing_tx(&self) -> Result<StorageTransaction, ScraperError> {
debug!("starting storage tx");
self.manager
.connection_pool
.begin()
.await
.map_err(|source| ScraperError::StorageTxBeginFailure { source })
}
pub async fn lowest_block_height(&self) -> Result<Option<i64>, ScraperError> {
Ok(self.manager.get_lowest_block().await?)
}
pub async fn get_first_block_height_after(
&self,
time: OffsetDateTime,
) -> Result<Option<i64>, ScraperError> {
Ok(self.manager.get_first_block_height_after(time).await?)
}
pub async fn get_last_block_height_before(
&self,
time: OffsetDateTime,
) -> Result<Option<i64>, ScraperError> {
Ok(self.manager.get_last_block_height_before(time).await?)
}
pub async fn get_blocks_between(
&self,
start_time: OffsetDateTime,
end_time: OffsetDateTime,
) -> Result<i64, ScraperError> {
let Some(block_start) = self.get_first_block_height_after(start_time).await? else {
return Ok(0);
};
let Some(block_end) = self.get_last_block_height_before(end_time).await? else {
return Ok(0);
};
Ok(block_end - block_start)
}
pub async fn get_signed_between(
&self,
consensus_address: &str,
start_height: i64,
end_height: i64,
) -> Result<i64, ScraperError> {
Ok(self
.manager
.get_signed_between(consensus_address, start_height, end_height)
.await?)
}
pub async fn get_signed_between_times(
&self,
consensus_address: &str,
start_time: OffsetDateTime,
end_time: OffsetDateTime,
) -> Result<i64, ScraperError> {
let Some(block_start) = self.get_first_block_height_after(start_time).await? else {
return Ok(0);
};
let Some(block_end) = self.get_last_block_height_before(end_time).await? else {
return Ok(0);
};
self.get_signed_between(consensus_address, block_start, block_end)
.await
}
pub async fn get_precommit(
&self,
consensus_address: &str,
height: i64,
) -> Result<Option<CommitSignature>, ScraperError> {
Ok(self
.manager
.get_precommit(consensus_address, height)
.await?)
}
pub async fn get_block_signers(&self, height: i64) -> Result<Vec<Validator>, ScraperError> {
Ok(self.manager.get_block_validators(height).await?)
}
pub async fn get_all_known_validators(&self) -> Result<Vec<Validator>, ScraperError> {
Ok(self.manager.get_validators().await?)
}
pub async fn get_last_processed_height(&self) -> Result<i64, ScraperError> {
Ok(self.manager.get_last_processed_height().await?)
}
pub async fn get_pruned_height(&self) -> Result<i64, ScraperError> {
Ok(self.manager.get_pruned_height().await?)
}
}
pub async fn persist_block(
block: &FullBlockInformation,
tx: &mut StorageTransaction,
store_precommits: bool,
) -> Result<(), ScraperError> {
let total_gas = crate::helpers::tx_gas_sum(&block.transactions);
// SANITY CHECK: make sure the block proposer is present in the validator set
block.ensure_proposer()?;
// persist validators
persist_validators(&block.validators, tx).await?;
// persist block data
persist_block_data(&block.block, total_gas, tx).await?;
if store_precommits {
if let Some(commit) = &block.block.last_commit {
persist_commits(commit, &block.validators, tx).await?;
} else {
warn!("no commits for block {}", block.block.header.height)
}
}
// persist txs
persist_txs(&block.transactions, tx).await?;
// persist messages (inside the transactions)
persist_messages(&block.transactions, tx).await?;
update_last_processed(block.block.header.height.into(), tx.as_mut()).await?;
Ok(())
}
async fn persist_validators(
validators: &validators::Response,
tx: &mut StorageTransaction,
) -> Result<(), ScraperError> {
debug!("persisting {} validators", validators.total);
for validator in &validators.validators {
let consensus_address = crate::helpers::validator_consensus_address(validator.address)?;
let consensus_pubkey = crate::helpers::validator_pubkey_to_bech32(validator.pub_key)?;
insert_validator(
consensus_address.to_string(),
consensus_pubkey.to_string(),
tx.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_block_data(
block: &Block,
total_gas: i64,
tx: &mut StorageTransaction,
) -> Result<(), ScraperError> {
let proposer_address =
crate::helpers::validator_consensus_address(block.header.proposer_address)?.to_string();
insert_block(
block.header.height.into(),
block.header.hash().to_string(),
block.data.len() as u32,
total_gas,
proposer_address,
block.header.time.into(),
tx.as_mut(),
)
.await?;
Ok(())
}
async fn persist_commits(
commits: &Commit,
validators: &validators::Response,
tx: &mut StorageTransaction,
) -> Result<(), ScraperError> {
debug!("persisting up to {} commits", commits.signatures.len());
let height: i64 = commits.height.into();
for commit_sig in &commits.signatures {
let (validator_id, timestamp, signature) = match commit_sig {
CommitSig::BlockIdFlagAbsent => {
trace!("absent signature");
continue;
}
CommitSig::BlockIdFlagCommit {
validator_address,
timestamp,
signature,
} => (validator_address, timestamp, signature),
CommitSig::BlockIdFlagNil {
validator_address,
timestamp,
signature,
} => (validator_address, timestamp, signature),
};
let validator = crate::helpers::validator_info(*validator_id, validators)?;
let validator_address = crate::helpers::validator_consensus_address(*validator_id)?;
if signature.is_none() {
warn!("empty signature for {validator_address} at height {height}");
continue;
}
insert_precommit(
validator_address.to_string(),
height,
(*timestamp).into(),
validator.power.into(),
validator.proposer_priority.value(),
tx.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_txs(
txs: &[ParsedTransactionResponse],
tx: &mut StorageTransaction,
) -> Result<(), ScraperError> {
debug!("persisting {} txs", txs.len());
for chain_tx in txs {
insert_transaction(
chain_tx.hash.to_string(),
chain_tx.height.into(),
chain_tx.index as i64,
chain_tx.tx_result.code.is_ok(),
chain_tx.tx.body.messages.len() as i64,
chain_tx.tx.body.memo.clone(),
chain_tx.tx_result.gas_wanted,
chain_tx.tx_result.gas_used,
chain_tx.tx_result.log.clone(),
tx.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_messages(
txs: &[ParsedTransactionResponse],
tx: &mut StorageTransaction,
) -> Result<(), ScraperError> {
debug!("persisting messages");
for chain_tx in txs {
for (index, msg) in chain_tx.tx.body.messages.iter().enumerate() {
insert_message(
chain_tx.hash.to_string(),
index as i64,
msg.type_url.clone(),
chain_tx.height.into(),
tx.as_mut(),
)
.await?
}
}
Ok(())
}
+1 -1
View File
@@ -43,7 +43,7 @@ nym-network-defaults = { path = "../common/network-defaults" }
nym-task = { path = "../common/task" }
nym-validator-client = { path = "../common/client-libs/validator-client" }
nym-coconut-dkg-common = { path = "../common/cosmwasm-smart-contracts/coconut-dkg" }
nyxd-scraper = { path = "../common/nyxd-scraper" }
nyxd-scraper-sqlite = { path = "../common/nyxd-scraper-sqlite" }
nym-ticketbooks-merkle = { path = "../common/ticketbooks-merkle" }
nym-serde-helpers = { path = "../common/serde-helpers", features = ["base64"] }
nym-pemstore = { path = "../common/pemstore" }
@@ -3,7 +3,7 @@
use crate::cli::{try_load_current_config, ConfigOverridableArgs};
use crate::error::NymRewarderError;
use nyxd_scraper::NyxdScraper;
use nyxd_scraper_sqlite::SqliteNyxdScraper;
use std::path::PathBuf;
#[derive(Debug, clap::Args)]
@@ -24,7 +24,7 @@ pub(crate) async fn execute(args: Args) -> Result<(), NymRewarderError> {
let config =
try_load_current_config(&args.custom_config_path)?.with_override(args.config_override);
NyxdScraper::new(config.scraper_config())
SqliteNyxdScraper::new(config.scraper_config())
.await?
.unsafe_process_single_block(args.height)
.await?;
@@ -3,7 +3,7 @@
use crate::cli::{try_load_current_config, ConfigOverridableArgs};
use crate::error::NymRewarderError;
use nyxd_scraper::NyxdScraper;
use nyxd_scraper_sqlite::SqliteNyxdScraper;
use std::path::PathBuf;
#[derive(Debug, clap::Args)]
@@ -37,7 +37,7 @@ pub(crate) async fn execute(args: Args) -> Result<(), NymRewarderError> {
let config =
try_load_current_config(&args.custom_config_path)?.with_override(args.config_override);
NyxdScraper::new(config.scraper_config())
SqliteNyxdScraper::new(config.scraper_config())
.await?
.unsafe_process_block_range(args.start_height, args.stop_height)
.await?;
+4 -4
View File
@@ -12,7 +12,7 @@ use nym_config::{
DEFAULT_CONFIG_DIR, DEFAULT_CONFIG_FILENAME, DEFAULT_DATA_DIR, NYM_DIR,
};
use nym_validator_client::nyxd::{AccountId, Coin};
use nyxd_scraper::{PruningOptions, StartingBlockOpts};
use nyxd_scraper_sqlite::{PruningOptions, StartingBlockOpts};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::io;
@@ -119,11 +119,11 @@ impl Config {
}
}
pub fn scraper_config(&self) -> nyxd_scraper::Config {
nyxd_scraper::Config {
pub fn scraper_config(&self) -> nyxd_scraper_sqlite::Config {
nyxd_scraper_sqlite::Config {
websocket_url: self.nyxd_scraper.websocket_url.clone(),
rpc_url: self.base.upstream_nyxd.clone(),
database_path: self.storage_paths.nyxd_scraper.clone(),
database_storage: self.storage_paths.nyxd_scraper.clone(),
pruning_options: self.nyxd_scraper.pruning,
store_precommits: self.nyxd_scraper.store_precommits,
start_block: StartingBlockOpts {

Some files were not shown because too many files have changed in this diff Show More