Add /v3/nym-nodes (#5569)

* Add /v3/nym-nodes
- returns extended node info from local DB
- endpoint caching
- add bond_info & self_described to DB nym_nodes
- update mixnode & gateway bond status on data refresh
- add `active` column to DB nym_nodes
- use only active & bonded nodes in scraping/testrun tasks

* Improve log

* PR feedback
- remove active field from nym_nodes
- delete obsolete nym_nodes

* node-status-api: cargo sqlx prepare

* Remove guardrails in CI file

* Revert "node-status-api: cargo sqlx prepare"

This reverts commit 1fcd895f0d.

* Try to ignore sqlx files

* cargo sqlx prepare

* Repair harbor tag check

* Try without checkout action

* add awk

* Update log
This commit is contained in:
dynco-nym
2025-03-15 00:17:40 +01:00
committed by GitHub
parent 3baac1292d
commit 3f8abdb74f
62 changed files with 1237 additions and 419 deletions
+1
View File
@@ -1 +1,2 @@
nym-validator-rewarder/.sqlx/** diff=nodiff
nym-node-status-api/nym-node-status-api/.sqlx/** diff=nodiff
+13 -13
View File
@@ -22,17 +22,16 @@ jobs:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
- name: Check if git tag exists
uses: actions/checkout@v4
- run: |
run: |
TAG=${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
if [[ -z "$TAG" ]]; then
echo "Tag is empty"
exit 1
fi
if git ls-remote --tags origin | grep -q "refs/tags/$TAG$" ; then
git ls-remote --tags origin | awk '{print $2}'
if git ls-remote --tags origin | awk '{print $2}' | grep -q "refs/tags/$TAG$" ; then
echo "Tag '$TAG' ALREADY EXISTS on the remote"
# TODO uncomment after bumping NS API version
# exit 1
exit 1
else
echo "Tag '$TAG' does not exist on the remote"
fi
@@ -45,13 +44,14 @@ jobs:
echo "Tag is empty"
exit 1
fi
curl -su "${{ secrets.HARBOR_ROBOT_USERNAME }}":"${{ secrets.HARBOR_ROBOT_SECRET }}" "$registry/v2/$repo_name/tags/list" | jq
curl -su ${{ secrets.HARBOR_ROBOT_USERNAME }}:${{ secrets.HARBOR_ROBOT_SECRET }} "$registry/v2/$repo_name/tags/list" | jq -e --arg search $TAG '.tags | index($search)' > /dev/null
result=$?
if [ "${result}" -eq 0 ]; then
echo "Version '$TAG' defined in Cargo.toml already exists as tag in harbor repo"
# TODO uncomment after bumping NS API version
# exit 1
curl -su ${{ secrets.HARBOR_ROBOT_USERNAME }}:${{ secrets.HARBOR_ROBOT_SECRET }} "$registry/v2/$repo_name/tags/list" | jq
exists=$(curl -su ${{ secrets.HARBOR_ROBOT_USERNAME }}:${{ secrets.HARBOR_ROBOT_SECRET }} "$registry/v2/$repo_name/tags/list" | jq --arg tag $TAG '.tags | contains([$tag])' )
if [[ $exists = "true" ]]; then
echo "Version '$TAG' defined in Cargo.toml ALREADY EXISTS as tag in harbor repo"
exit 1
elif [[ $exists = "false" ]]; then
echo "Version '$TAG' doesn't exist on the remote"
else
echo "Version '$TAG' doesn't exist on the remote"
echo "Unknown output '$exists'"
exit 1
fi
Generated
+1 -1
View File
@@ -6339,7 +6339,7 @@ dependencies = [
[[package]]
name = "nym-node-status-api"
version = "1.0.2"
version = "1.0.3"
dependencies = [
"ammonia",
"anyhow",
@@ -3,7 +3,7 @@
set -eu
export ENVIRONMENT=${ENVIRONMENT:-"mainnet"}
probe_git_ref="nym-vpn-core-v1.3.2"
probe_git_ref="nym-vpn-core-v1.4.0"
crate_root=$(dirname $(realpath "$0"))
monorepo_root=$(realpath "${crate_root}/../..")
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "\n INSERT INTO mixnode_daily_stats (\n mix_id, date_utc,\n total_stake, packets_received,\n packets_sent, packets_dropped\n ) VALUES (?, ?, ?, ?, ?, ?)\n ON CONFLICT(mix_id, date_utc) DO UPDATE SET\n total_stake = excluded.total_stake,\n packets_received = mixnode_daily_stats.packets_received + excluded.packets_received,\n packets_sent = mixnode_daily_stats.packets_sent + excluded.packets_sent,\n packets_dropped = mixnode_daily_stats.packets_dropped + excluded.packets_dropped\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 6
},
"nullable": []
},
"hash": "01ee4a30bc3104712e5bc371a45d614a89d88adf02358800433e06100c13c548"
}
@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
"query": "\n SELECT mix_id as node_id, host, http_api_port \n FROM mixnodes \n WHERE bonded = true\n ",
"query": "\n SELECT mix_id as node_id, host, http_api_port\n FROM mixnodes\n WHERE bonded = true\n ",
"describe": {
"columns": [
{
@@ -28,5 +28,5 @@
false
]
},
"hash": "8c6c1c67df2d06da74825872f448d30505801b56506668499f9da718f1d6b8f8"
"hash": "021c6c65d1ed806d8430bef7883906b42a7e4b280c8efb32db15d7c6a51d7a27"
}
@@ -1,12 +0,0 @@
{
"db_name": "SQLite",
"query": "\n INSERT INTO mixnode_description (\n mix_id, moniker, website, security_contact, details, last_updated_utc\n ) VALUES (?, ?, ?, ?, ?, ?)\n ON CONFLICT (mix_id) DO UPDATE SET\n moniker = excluded.moniker,\n website = excluded.website,\n security_contact = excluded.security_contact,\n details = excluded.details,\n last_updated_utc = excluded.last_updated_utc\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 6
},
"nullable": []
},
"hash": "06065394c157927e4002ddd5c7c1af626ae15728d615f539470cd7c189312385"
}
@@ -0,0 +1,44 @@
{
"db_name": "SQLite",
"query": "\n SELECT\n date_utc as \"date_utc!\",\n SUM(total_stake) as \"total_stake!: i64\",\n SUM(packets_received) as \"total_packets_received!: i64\",\n SUM(packets_sent) as \"total_packets_sent!: i64\",\n SUM(packets_dropped) as \"total_packets_dropped!: i64\"\n FROM (\n SELECT\n date_utc,\n n.total_stake,\n n.packets_received,\n n.packets_sent,\n n.packets_dropped\n FROM nym_node_daily_mixing_stats n\n UNION ALL\n SELECT\n m.date_utc,\n m.total_stake,\n m.packets_received,\n m.packets_sent,\n m.packets_dropped\n FROM mixnode_daily_stats m\n LEFT JOIN nym_node_daily_mixing_stats ON m.mix_id = nym_node_daily_mixing_stats.node_id\n WHERE nym_node_daily_mixing_stats.node_id IS NULL\n )\n GROUP BY date_utc\n ORDER BY date_utc ASC\n ",
"describe": {
"columns": [
{
"name": "date_utc!",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "total_stake!: i64",
"ordinal": 1,
"type_info": "Int"
},
{
"name": "total_packets_received!: i64",
"ordinal": 2,
"type_info": "Int"
},
{
"name": "total_packets_sent!: i64",
"ordinal": 3,
"type_info": "Int"
},
{
"name": "total_packets_dropped!: i64",
"ordinal": 4,
"type_info": "Int"
}
],
"parameters": {
"Right": 0
},
"nullable": [
false,
false,
true,
true,
true
]
},
"hash": "124d45b9604439584650f401607c46bdbd162c7c689f74fe9ddfdfd48f5ddc07"
}
@@ -1,12 +0,0 @@
{
"db_name": "SQLite",
"query": "UPDATE mixnodes\n SET bonded = ?, last_updated_utc = ?\n WHERE id = ?;",
"describe": {
"columns": [],
"parameters": {
"Right": 3
},
"nullable": []
},
"hash": "18abc8fde56cf86baed7b4afa38f2c63cdf90f2f3b6d81afb9000bb0968dcaea"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "\n INSERT INTO mixnode_packet_stats_raw (\n mix_id, timestamp_utc, packets_received, packets_sent, packets_dropped\n ) VALUES (?, ?, ?, ?, ?)\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 5
},
"nullable": []
},
"hash": "21e44766729777756f6eb04bf3b81df3e591008a1e3fd664ed83ca86ac51bd8c"
}
@@ -0,0 +1,26 @@
{
"db_name": "SQLite",
"query": "SELECT\n node_id,\n bond_info as \"bond_info: serde_json::Value\"\n FROM\n nym_nodes\n WHERE\n bond_info IS NOT NULL\n AND\n self_described IS NOT NULL\n ",
"describe": {
"columns": [
{
"name": "node_id",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "bond_info: serde_json::Value",
"ordinal": 1,
"type_info": "Text"
}
],
"parameters": {
"Right": 0
},
"nullable": [
false,
true
]
},
"hash": "227539374e7473f6f9642289c5b5d1bcd636315ab23537cb5f6d2f82a2bcb7bf"
}
@@ -0,0 +1,20 @@
{
"db_name": "SQLite",
"query": "\n SELECT gateway_identity_key\n FROM gateways\n WHERE bonded = true\n ",
"describe": {
"columns": [
{
"name": "gateway_identity_key",
"ordinal": 0,
"type_info": "Text"
}
],
"parameters": {
"Right": 0
},
"nullable": [
false
]
},
"hash": "25300e435780101fa207c8e26ef2f49ba5db84d63e89440bb494e8327fe73686"
}
@@ -0,0 +1,86 @@
{
"db_name": "SQLite",
"query": "SELECT\n node_id,\n ed25519_identity_pubkey,\n total_stake,\n ip_addresses as \"ip_addresses!: serde_json::Value\",\n mix_port,\n x25519_sphinx_pubkey,\n node_role as \"node_role: serde_json::Value\",\n supported_roles as \"supported_roles: serde_json::Value\",\n entry as \"entry: serde_json::Value\",\n performance,\n self_described as \"self_described: serde_json::Value\",\n bond_info as \"bond_info: serde_json::Value\"\n FROM\n nym_nodes\n WHERE\n self_described IS NOT NULL\n AND\n bond_info IS NOT NULL\n ",
"describe": {
"columns": [
{
"name": "node_id",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "ed25519_identity_pubkey",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "total_stake",
"ordinal": 2,
"type_info": "Int64"
},
{
"name": "ip_addresses!: serde_json::Value",
"ordinal": 3,
"type_info": "Text"
},
{
"name": "mix_port",
"ordinal": 4,
"type_info": "Int64"
},
{
"name": "x25519_sphinx_pubkey",
"ordinal": 5,
"type_info": "Text"
},
{
"name": "node_role: serde_json::Value",
"ordinal": 6,
"type_info": "Text"
},
{
"name": "supported_roles: serde_json::Value",
"ordinal": 7,
"type_info": "Text"
},
{
"name": "entry: serde_json::Value",
"ordinal": 8,
"type_info": "Text"
},
{
"name": "performance",
"ordinal": 9,
"type_info": "Text"
},
{
"name": "self_described: serde_json::Value",
"ordinal": 10,
"type_info": "Text"
},
{
"name": "bond_info: serde_json::Value",
"ordinal": 11,
"type_info": "Text"
}
],
"parameters": {
"Right": 0
},
"nullable": [
false,
false,
false,
false,
false,
false,
false,
false,
true,
false,
true,
true
]
},
"hash": "283f49a65c7d70bf271702ff6a5c7ad6e68c81932d295ff18ed198c54706a57c"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "\n INSERT INTO mixnode_description (\n mix_id, moniker, website, security_contact, details, last_updated_utc\n ) VALUES (?, ?, ?, ?, ?, ?)\n ON CONFLICT (mix_id) DO UPDATE SET\n moniker = excluded.moniker,\n website = excluded.website,\n security_contact = excluded.security_contact,\n details = excluded.details,\n last_updated_utc = excluded.last_updated_utc\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 6
},
"nullable": []
},
"hash": "3825020ab0ecbffe83270c57ec74fb7987a6807d3381a6068c79b127a668b190"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "INSERT INTO mixnodes\n (mix_id, identity_key, bonded, total_stake,\n host, http_api_port, full_details,\n self_described, last_updated_utc, is_dp_delegatee)\n VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)\n ON CONFLICT(mix_id) DO UPDATE SET\n bonded=excluded.bonded,\n total_stake=excluded.total_stake, host=excluded.host,\n http_api_port=excluded.http_api_port,\n full_details=excluded.full_details,self_described=excluded.self_described,\n last_updated_utc=excluded.last_updated_utc,\n is_dp_delegatee = excluded.is_dp_delegatee;",
"describe": {
"columns": [],
"parameters": {
"Right": 10
},
"nullable": []
},
"hash": "3cd5cb4bfca4243925da4ddbccd811e842090e98982e1032670df77961870b32"
}
@@ -1,12 +0,0 @@
{
"db_name": "SQLite",
"query": "UPDATE gateways\n SET bonded = ?, last_updated_utc = ?\n WHERE id = ?;",
"describe": {
"columns": [],
"parameters": {
"Right": 3
},
"nullable": []
},
"hash": "3d3a1fa429e3090741c6b6a8e82e692afc04b51e8782bcbf59f1eb4116112536"
}
@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
"query": "SELECT\n id as \"id!\",\n gateway_identity_key as \"gateway_identity_key!\",\n self_described as \"self_described?\",\n explorer_pretty_bond as \"explorer_pretty_bond?\"\n FROM gateways\n WHERE gateway_identity_key = ?\n ORDER BY gateway_identity_key\n LIMIT 1",
"query": "SELECT\n id as \"id!\",\n gateway_identity_key as \"gateway_identity_key!\",\n self_described as \"self_described?\",\n explorer_pretty_bond as \"explorer_pretty_bond?\"\n FROM gateways\n WHERE gateway_identity_key = ?\n AND bonded = true\n ORDER BY gateway_identity_key\n LIMIT 1",
"describe": {
"columns": [
{
@@ -34,5 +34,5 @@
true
]
},
"hash": "3d5fc502f976f5081f01352856b8632c29c81bfafb043bb8744129cf9e0266ad"
"hash": "3e7e987780937873cdb393b157d7708c9f01047b0689eb0d4f7a973b328c609d"
}
@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
"query": "SELECT\n gw.gateway_identity_key as \"gateway_identity_key!\",\n gw.bonded as \"bonded: bool\",\n gw.blacklisted as \"blacklisted: bool\",\n gw.performance as \"performance!\",\n gw.self_described as \"self_described?\",\n gw.explorer_pretty_bond as \"explorer_pretty_bond?\",\n gw.last_probe_result as \"last_probe_result?\",\n gw.last_probe_log as \"last_probe_log?\",\n gw.last_testrun_utc as \"last_testrun_utc?\",\n gw.last_updated_utc as \"last_updated_utc!\",\n COALESCE(gd.moniker, \"NA\") as \"moniker!\",\n COALESCE(gd.website, \"NA\") as \"website!\",\n COALESCE(gd.security_contact, \"NA\") as \"security_contact!\",\n COALESCE(gd.details, \"NA\") as \"details!\"\n FROM gateways gw\n LEFT JOIN gateway_description gd\n ON gw.gateway_identity_key = gd.gateway_identity_key\n ORDER BY gw.gateway_identity_key",
"query": "SELECT\n gw.gateway_identity_key as \"gateway_identity_key!\",\n gw.bonded as \"bonded: bool\",\n gw.performance as \"performance!\",\n gw.self_described as \"self_described?\",\n gw.explorer_pretty_bond as \"explorer_pretty_bond?\",\n gw.last_probe_result as \"last_probe_result?\",\n gw.last_probe_log as \"last_probe_log?\",\n gw.last_testrun_utc as \"last_testrun_utc?\",\n gw.last_updated_utc as \"last_updated_utc!\",\n COALESCE(gd.moniker, \"NA\") as \"moniker!\",\n COALESCE(gd.website, \"NA\") as \"website!\",\n COALESCE(gd.security_contact, \"NA\") as \"security_contact!\",\n COALESCE(gd.details, \"NA\") as \"details!\"\n FROM gateways gw\n LEFT JOIN gateway_description gd\n ON gw.gateway_identity_key = gd.gateway_identity_key\n ORDER BY gw.gateway_identity_key",
"describe": {
"columns": [
{
@@ -14,63 +14,58 @@
"type_info": "Int64"
},
{
"name": "blacklisted: bool",
"name": "performance!",
"ordinal": 2,
"type_info": "Int64"
},
{
"name": "performance!",
"ordinal": 3,
"type_info": "Int64"
},
{
"name": "self_described?",
"ordinal": 4,
"ordinal": 3,
"type_info": "Text"
},
{
"name": "explorer_pretty_bond?",
"ordinal": 5,
"ordinal": 4,
"type_info": "Text"
},
{
"name": "last_probe_result?",
"ordinal": 6,
"ordinal": 5,
"type_info": "Text"
},
{
"name": "last_probe_log?",
"ordinal": 7,
"ordinal": 6,
"type_info": "Text"
},
{
"name": "last_testrun_utc?",
"ordinal": 8,
"ordinal": 7,
"type_info": "Int64"
},
{
"name": "last_updated_utc!",
"ordinal": 9,
"ordinal": 8,
"type_info": "Int64"
},
{
"name": "moniker!",
"ordinal": 10,
"ordinal": 9,
"type_info": "Text"
},
{
"name": "website!",
"ordinal": 11,
"ordinal": 10,
"type_info": "Text"
},
{
"name": "security_contact!",
"ordinal": 12,
"ordinal": 11,
"type_info": "Text"
},
{
"name": "details!",
"ordinal": 13,
"ordinal": 12,
"type_info": "Text"
}
],
@@ -82,7 +77,6 @@
false,
false,
false,
false,
true,
true,
true,
@@ -94,5 +88,5 @@
false
]
},
"hash": "71a455c705f9c25d3843ff2fb8629d1320a5eb10797cdb5a435455d22c6aeac1"
"hash": "3eb1d8491bda3c1d6e071b6eb364b9a979f4bdb11ea81b2d0f022555bab51ecb"
}
@@ -0,0 +1,20 @@
{
"db_name": "SQLite",
"query": "\n SELECT\n total_stake\n FROM mixnodes\n WHERE mix_id = ?\n ",
"describe": {
"columns": [
{
"name": "total_stake",
"ordinal": 0,
"type_info": "Int64"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false
]
},
"hash": "3fc2baabf194b147b20be2a49401cc0c100a1d7a7c347393adde2410fa6f4dfe"
}
@@ -1,32 +0,0 @@
{
"db_name": "SQLite",
"query": "SELECT\n id as \"id!\",\n gateway_identity_key as \"identity_key!\",\n bonded as \"bonded: bool\"\n FROM gateways\n WHERE bonded = ?",
"describe": {
"columns": [
{
"name": "id!",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "identity_key!",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "bonded: bool",
"ordinal": 2,
"type_info": "Int64"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false,
false,
false
]
},
"hash": "4b61a4bc32333c92a8f5ad4ad0017b40dc01845f554b5479f37855d89b309e6f"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "\n INSERT INTO nym_node_daily_mixing_stats (\n node_id, date_utc,\n total_stake, packets_received,\n packets_sent, packets_dropped\n ) VALUES (?, ?, ?, ?, ?, ?)\n ON CONFLICT(node_id, date_utc) DO UPDATE SET\n total_stake = excluded.total_stake,\n packets_received = nym_node_daily_mixing_stats.packets_received + excluded.packets_received,\n packets_sent = nym_node_daily_mixing_stats.packets_sent + excluded.packets_sent,\n packets_dropped = nym_node_daily_mixing_stats.packets_dropped + excluded.packets_dropped\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 6
},
"nullable": []
},
"hash": "5912ea335a957d217f5e2b3a63a25b31715c2098310fe7a9db688bc2fd36aad4"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "INSERT INTO nym_nodes\n (node_id, ed25519_identity_pubkey,\n total_stake,\n ip_addresses, mix_port,\n x25519_sphinx_pubkey, node_role,\n supported_roles, entry,\n self_described,\n bond_info,\n performance, last_updated_utc\n )\n VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)\n ON CONFLICT(node_id) DO UPDATE SET\n ed25519_identity_pubkey=excluded.ed25519_identity_pubkey,\n ip_addresses=excluded.ip_addresses,\n mix_port=excluded.mix_port,\n x25519_sphinx_pubkey=excluded.x25519_sphinx_pubkey,\n node_role=excluded.node_role,\n supported_roles=excluded.supported_roles,\n entry=excluded.entry,\n self_described=excluded.self_described,\n bond_info=excluded.bond_info,\n performance=excluded.performance,\n last_updated_utc=excluded.last_updated_utc\n ;",
"describe": {
"columns": [],
"parameters": {
"Right": 13
},
"nullable": []
},
"hash": "664e059ac2c58e1115fe214376a6b326b31c93298f20019772cce2e277a194f8"
}
@@ -1,12 +0,0 @@
{
"db_name": "SQLite",
"query": "INSERT INTO mixnodes\n (mix_id, identity_key, bonded, total_stake,\n host, http_api_port, blacklisted, full_details,\n self_described, last_updated_utc, is_dp_delegatee)\n VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)\n ON CONFLICT(mix_id) DO UPDATE SET\n bonded=excluded.bonded,\n total_stake=excluded.total_stake, host=excluded.host,\n http_api_port=excluded.http_api_port,blacklisted=excluded.blacklisted,\n full_details=excluded.full_details,self_described=excluded.self_described,\n last_updated_utc=excluded.last_updated_utc,\n is_dp_delegatee = excluded.is_dp_delegatee;",
"describe": {
"columns": [],
"parameters": {
"Right": 11
},
"nullable": []
},
"hash": "6eb1a682cf13205cf701590021cdf795147ac3724e89df5b2f24f7215d87dce1"
}
@@ -0,0 +1,26 @@
{
"db_name": "SQLite",
"query": "SELECT\n node_id,\n self_described as \"self_described: serde_json::Value\"\n FROM\n nym_nodes\n WHERE\n self_described IS NOT NULL\n ",
"describe": {
"columns": [
{
"name": "node_id",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "self_described: serde_json::Value",
"ordinal": 1,
"type_info": "Text"
}
],
"parameters": {
"Right": 0
},
"nullable": [
false,
true
]
},
"hash": "7526390c3d17622564067f153867295a279179315650f6fb363d5f3177bf70e3"
}
@@ -1,32 +0,0 @@
{
"db_name": "SQLite",
"query": "\n SELECT packets_received, packets_sent, packets_dropped\n FROM mixnode_packet_stats_raw\n WHERE mix_id = ?\n ORDER BY timestamp_utc DESC\n LIMIT 1 OFFSET 1\n ",
"describe": {
"columns": [
{
"name": "packets_received",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "packets_sent",
"ordinal": 1,
"type_info": "Int64"
},
{
"name": "packets_dropped",
"ordinal": 2,
"type_info": "Int64"
}
],
"parameters": {
"Right": 1
},
"nullable": [
true,
true,
true
]
},
"hash": "7b3e44e1dea0568d331017216e5375684086bbeb80f3b251adc761bae0dafe92"
}
@@ -1,12 +0,0 @@
{
"db_name": "SQLite",
"query": "\n INSERT INTO mixnode_packet_stats_raw (\n mix_id, timestamp_utc, packets_received, packets_sent, packets_dropped\n ) VALUES (?, ?, ?, ?, ?)\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 5
},
"nullable": []
},
"hash": "84e784b7ceb479f37fa312dd9a8bb0b3debc04cd251cdfaa438bce7aca7c5992"
}
@@ -1,12 +0,0 @@
{
"db_name": "SQLite",
"query": "UPDATE gateways\n SET blacklisted = true\n WHERE gateway_identity_key = ?;",
"describe": {
"columns": [],
"parameters": {
"Right": 1
},
"nullable": []
},
"hash": "8571faad2f66e08f24acfbfe036d17ca6eb090df7f6d52ef89c5d51564f8b45c"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "\n INSERT INTO nym_node_descriptions (\n node_id, moniker, website, security_contact, details, last_updated_utc\n ) VALUES (?, ?, ?, ?, ?, ?)\n ON CONFLICT (node_id) DO UPDATE SET\n moniker = excluded.moniker,\n website = excluded.website,\n security_contact = excluded.security_contact,\n details = excluded.details,\n last_updated_utc = excluded.last_updated_utc\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 6
},
"nullable": []
},
"hash": "9104e50524ad5a103bd199a0531d73b74876e9aecda2117227e2e180258d91a1"
}
@@ -0,0 +1,86 @@
{
"db_name": "SQLite",
"query": "SELECT\n node_id,\n ed25519_identity_pubkey,\n total_stake,\n ip_addresses as \"ip_addresses!: serde_json::Value\",\n mix_port,\n x25519_sphinx_pubkey,\n node_role as \"node_role: serde_json::Value\",\n supported_roles as \"supported_roles: serde_json::Value\",\n entry as \"entry: serde_json::Value\",\n performance,\n self_described as \"self_described: serde_json::Value\",\n bond_info as \"bond_info: serde_json::Value\"\n FROM\n nym_nodes\n ",
"describe": {
"columns": [
{
"name": "node_id",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "ed25519_identity_pubkey",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "total_stake",
"ordinal": 2,
"type_info": "Int64"
},
{
"name": "ip_addresses!: serde_json::Value",
"ordinal": 3,
"type_info": "Text"
},
{
"name": "mix_port",
"ordinal": 4,
"type_info": "Int64"
},
{
"name": "x25519_sphinx_pubkey",
"ordinal": 5,
"type_info": "Text"
},
{
"name": "node_role: serde_json::Value",
"ordinal": 6,
"type_info": "Text"
},
{
"name": "supported_roles: serde_json::Value",
"ordinal": 7,
"type_info": "Text"
},
{
"name": "entry: serde_json::Value",
"ordinal": 8,
"type_info": "Text"
},
{
"name": "performance",
"ordinal": 9,
"type_info": "Text"
},
{
"name": "self_described: serde_json::Value",
"ordinal": 10,
"type_info": "Text"
},
{
"name": "bond_info: serde_json::Value",
"ordinal": 11,
"type_info": "Text"
}
],
"parameters": {
"Right": 0
},
"nullable": [
false,
false,
false,
false,
false,
false,
false,
false,
true,
false,
true,
true
]
},
"hash": "9334f0c91252fcd7ec72558a271222615bb282e5334665700709ae475a5daea2"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "INSERT INTO gateways\n (gateway_identity_key, bonded,\n self_described, explorer_pretty_bond,\n last_updated_utc, performance)\n VALUES (?, ?, ?, ?, ?, ?)\n ON CONFLICT(gateway_identity_key) DO UPDATE SET\n bonded=excluded.bonded,\n self_described=excluded.self_described,\n explorer_pretty_bond=excluded.explorer_pretty_bond,\n last_updated_utc=excluded.last_updated_utc,\n performance = excluded.performance;",
"describe": {
"columns": [],
"parameters": {
"Right": 6
},
"nullable": []
},
"hash": "9796d354ae075eab4cbd3438839c39da94025494395ec7b093aefef696f2d0c5"
}
@@ -0,0 +1,32 @@
{
"db_name": "SQLite",
"query": "\n SELECT\n COALESCE(packets_received, 0) as packets_received,\n COALESCE(packets_sent, 0) as packets_sent,\n COALESCE(packets_dropped, 0) as packets_dropped\n FROM nym_nodes_packet_stats_raw\n WHERE node_id = ?\n ORDER BY timestamp_utc DESC\n LIMIT 1 OFFSET 1\n ",
"describe": {
"columns": [
{
"name": "packets_received",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "packets_sent",
"ordinal": 1,
"type_info": "Int64"
},
{
"name": "packets_dropped",
"ordinal": 2,
"type_info": "Int64"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false,
false,
false
]
},
"hash": "97f4b340bb0f6c9c78d77b24d4f9d3d13ab61cbaac6f8c3cfc06c95e0f13252f"
}
@@ -1,12 +0,0 @@
{
"db_name": "SQLite",
"query": "\n INSERT INTO mixnode_daily_stats (\n mix_id, date_utc, total_stake, packets_received, packets_sent, packets_dropped\n ) VALUES (?, ?, ?, ?, ?, ?)\n ON CONFLICT(mix_id, date_utc) DO UPDATE SET\n total_stake = excluded.total_stake,\n packets_received = mixnode_daily_stats.packets_received + excluded.packets_received,\n packets_sent = mixnode_daily_stats.packets_sent + excluded.packets_sent,\n packets_dropped = mixnode_daily_stats.packets_dropped + excluded.packets_dropped\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 6
},
"nullable": []
},
"hash": "a14d5767512bc88ac47b667c47a773b548ccb20bc539839766080d8455439339"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "DELETE FROM nym_nodes\n WHERE node_id = ?",
"describe": {
"columns": [],
"parameters": {
"Right": 1
},
"nullable": []
},
"hash": "b41fc93f5dc7a397e8898776bf1335d9c26fe1447309f46623bcfee4537991b1"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "UPDATE\n mixnodes\n SET\n bonded = false\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 0
},
"nullable": []
},
"hash": "c42917c9542c1d720d92035863064741aefc9f7a7d1630f6b863ebd8174b6684"
}
@@ -1,44 +0,0 @@
{
"db_name": "SQLite",
"query": "\n SELECT\n date_utc as \"date_utc!\",\n packets_received as \"total_packets_received!: i64\",\n packets_sent as \"total_packets_sent!: i64\",\n packets_dropped as \"total_packets_dropped!: i64\",\n total_stake as \"total_stake!: i64\"\n FROM (\n SELECT\n date_utc,\n SUM(packets_received) as packets_received,\n SUM(packets_sent) as packets_sent,\n SUM(packets_dropped) as packets_dropped,\n SUM(total_stake) as total_stake\n FROM mixnode_daily_stats\n GROUP BY date_utc\n ORDER BY date_utc DESC\n LIMIT 30\n )\n GROUP BY date_utc\n ORDER BY date_utc\n ",
"describe": {
"columns": [
{
"name": "date_utc!",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "total_packets_received!: i64",
"ordinal": 1,
"type_info": "Int64"
},
{
"name": "total_packets_sent!: i64",
"ordinal": 2,
"type_info": "Int64"
},
{
"name": "total_packets_dropped!: i64",
"ordinal": 3,
"type_info": "Int64"
},
{
"name": "total_stake!: i64",
"ordinal": 4,
"type_info": "Int64"
}
],
"parameters": {
"Right": 0
},
"nullable": [
false,
true,
true,
true,
false
]
},
"hash": "c5e3cd7284b334df5aa979b1627ea1f6dc2aed00cedde25f2be3567e47064351"
}
@@ -1,32 +0,0 @@
{
"db_name": "SQLite",
"query": "SELECT\n id as \"id!\",\n identity_key as \"identity_key!\",\n bonded as \"bonded: bool\"\n FROM mixnodes\n WHERE bonded = ?",
"describe": {
"columns": [
{
"name": "id!",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "identity_key!",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "bonded: bool",
"ordinal": 2,
"type_info": "Int64"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false,
false,
false
]
},
"hash": "c7ba2621becb9ac4b5dee0ce303dadfcf19095935867a51cbd5b8362d1505fcc"
}
@@ -0,0 +1,20 @@
{
"db_name": "SQLite",
"query": "\n SELECT mix_id\n FROM mixnodes\n WHERE bonded = true\n ",
"describe": {
"columns": [
{
"name": "mix_id",
"ordinal": 0,
"type_info": "Int64"
}
],
"parameters": {
"Right": 0
},
"nullable": [
false
]
},
"hash": "c910788edefe64bbb34379702bcbde9ec6159c9fa03b13652e1f620dcd92125e"
}
@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
"query": "\n SELECT\n total_stake\n FROM mixnodes\n WHERE mix_id = ?\n ",
"query": "\n SELECT\n total_stake\n FROM nym_nodes\n WHERE node_id = ?\n ",
"describe": {
"columns": [
{
@@ -16,5 +16,5 @@
false
]
},
"hash": "969acbff4b128d3a3dc998bb5db1a4c8b4c106767e0276435dc23fcfd6acfe4f"
"hash": "d2e07d44594ca5b44a6100482ff432c39d761f2a0ac1d6515cf73416f2eb6c61"
}
@@ -1,12 +0,0 @@
{
"db_name": "SQLite",
"query": "INSERT INTO gateways\n (gateway_identity_key, bonded, blacklisted,\n self_described, explorer_pretty_bond,\n last_updated_utc, performance)\n VALUES (?, ?, ?, ?, ?, ?, ?)\n ON CONFLICT(gateway_identity_key) DO UPDATE SET\n bonded=excluded.bonded,\n blacklisted=excluded.blacklisted,\n self_described=excluded.self_described,\n explorer_pretty_bond=excluded.explorer_pretty_bond,\n last_updated_utc=excluded.last_updated_utc,\n performance = excluded.performance;",
"describe": {
"columns": [],
"parameters": {
"Right": 7
},
"nullable": []
},
"hash": "d8ea93e781666e6267902170709ee2aa37f6163525bbdce1a4cebef4a285f8d9"
}
@@ -0,0 +1,32 @@
{
"db_name": "SQLite",
"query": "\n SELECT\n COALESCE(packets_received, 0) as packets_received,\n COALESCE(packets_sent, 0) as packets_sent,\n COALESCE(packets_dropped, 0) as packets_dropped\n FROM mixnode_packet_stats_raw\n WHERE mix_id = ?\n ORDER BY timestamp_utc DESC\n LIMIT 1 OFFSET 1\n ",
"describe": {
"columns": [
{
"name": "packets_received",
"ordinal": 0,
"type_info": "Int64"
},
{
"name": "packets_sent",
"ordinal": 1,
"type_info": "Int64"
},
{
"name": "packets_dropped",
"ordinal": 2,
"type_info": "Int64"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false,
false,
false
]
},
"hash": "f1f47a4490c3e1330885ef3cf3cda054f2cf760520a46a94db22a02a9cb53dba"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "\n INSERT INTO gateway_description (\n gateway_identity_key,\n moniker,\n website,\n security_contact,\n details,\n last_updated_utc\n ) VALUES (?, ?, ?, ?, ?, ?)\n ON CONFLICT (gateway_identity_key) DO UPDATE SET\n moniker = excluded.moniker,\n website = excluded.website,\n security_contact = excluded.security_contact,\n details = excluded.details,\n last_updated_utc = excluded.last_updated_utc\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 6
},
"nullable": []
},
"hash": "f2a7e7229b414a6283685558d1d0731d21b1a92eb39481cf342b87c3b5b0f759"
}
@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
"query": "SELECT\n mn.mix_id as \"mix_id!\",\n mn.bonded as \"bonded: bool\",\n mn.blacklisted as \"blacklisted: bool\",\n mn.is_dp_delegatee as \"is_dp_delegatee: bool\",\n mn.total_stake as \"total_stake!\",\n mn.full_details as \"full_details!\",\n mn.self_described as \"self_described\",\n mn.last_updated_utc as \"last_updated_utc!\",\n COALESCE(md.moniker, \"NA\") as \"moniker!\",\n COALESCE(md.website, \"NA\") as \"website!\",\n COALESCE(md.security_contact, \"NA\") as \"security_contact!\",\n COALESCE(md.details, \"NA\") as \"details!\"\n FROM mixnodes mn\n LEFT JOIN mixnode_description md ON mn.mix_id = md.mix_id\n ORDER BY mn.mix_id",
"query": "SELECT\n mn.mix_id as \"mix_id!\",\n mn.bonded as \"bonded: bool\",\n mn.is_dp_delegatee as \"is_dp_delegatee: bool\",\n mn.total_stake as \"total_stake!\",\n mn.full_details as \"full_details!\",\n mn.self_described as \"self_described\",\n mn.last_updated_utc as \"last_updated_utc!\",\n COALESCE(md.moniker, \"NA\") as \"moniker!\",\n COALESCE(md.website, \"NA\") as \"website!\",\n COALESCE(md.security_contact, \"NA\") as \"security_contact!\",\n COALESCE(md.details, \"NA\") as \"details!\"\n FROM mixnodes mn\n LEFT JOIN mixnode_description md ON mn.mix_id = md.mix_id\n ORDER BY mn.mix_id",
"describe": {
"columns": [
{
@@ -14,53 +14,48 @@
"type_info": "Int64"
},
{
"name": "blacklisted: bool",
"name": "is_dp_delegatee: bool",
"ordinal": 2,
"type_info": "Int64"
},
{
"name": "is_dp_delegatee: bool",
"name": "total_stake!",
"ordinal": 3,
"type_info": "Int64"
},
{
"name": "total_stake!",
"ordinal": 4,
"type_info": "Int64"
},
{
"name": "full_details!",
"ordinal": 5,
"ordinal": 4,
"type_info": "Text"
},
{
"name": "self_described",
"ordinal": 6,
"ordinal": 5,
"type_info": "Text"
},
{
"name": "last_updated_utc!",
"ordinal": 7,
"ordinal": 6,
"type_info": "Int64"
},
{
"name": "moniker!",
"ordinal": 8,
"ordinal": 7,
"type_info": "Text"
},
{
"name": "website!",
"ordinal": 9,
"ordinal": 8,
"type_info": "Text"
},
{
"name": "security_contact!",
"ordinal": 10,
"ordinal": 9,
"type_info": "Text"
},
{
"name": "details!",
"ordinal": 11,
"ordinal": 10,
"type_info": "Text"
}
],
@@ -72,7 +67,6 @@
false,
false,
false,
false,
true,
true,
false,
@@ -82,5 +76,5 @@
false
]
},
"hash": "f0a4316081d1be9444a87b95d933d31cb4bcc4071d31d8d2f7755e2d2c2e3e35"
"hash": "f75af377da33db1455c6e0f612e0fa9583888f343b8b59faf37fc6799b244379"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "UPDATE\n gateways\n SET\n bonded = false\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 0
},
"nullable": []
},
"hash": "f7e3fa31d68c028bf39cc95389f29f8758ec922dd2e7ea064a1e537e580c9ee5"
}
@@ -0,0 +1,20 @@
{
"db_name": "SQLite",
"query": "SELECT\n node_id as \"node_id!: i64\"\n FROM\n nym_nodes\n ",
"describe": {
"columns": [
{
"name": "node_id!: i64",
"ordinal": 0,
"type_info": "Int64"
}
],
"parameters": {
"Right": 0
},
"nullable": [
true
]
},
"hash": "fb2fcd26974ce4c3f5055fa2ca8c266223814c19ab2a5763a0575274d684d02e"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "\n INSERT INTO nym_nodes_packet_stats_raw (\n node_id, timestamp_utc, packets_received, packets_sent, packets_dropped\n ) VALUES (?, ?, ?, ?, ?)\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 5
},
"nullable": []
},
"hash": "fcb1698d9e0e3a14524c92e7c99a811588c2bbc50d4975487a0464321a1b18c9"
}
@@ -3,9 +3,7 @@
[package]
name = "nym-node-status-api"
version = "1.0.2"
version = "1.0.3"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
@@ -5,13 +5,8 @@ set -e
user_rust_log_preference=$RUST_LOG
export ENVIRONMENT=${ENVIRONMENT:-"mainnet"}
export NYM_API_CLIENT_TIMEOUT=60
export EXPLORER_CLIENT_TIMEOUT=60
export NODE_STATUS_API_TESTRUN_REFRESH_INTERVAL=120
if [ "$ENVIRONMENT" = "mainnet" ]; then
export NYM_NODE_STATUS_API_HM_URL="https://harbourmaster.nymtech.net"
fi
# public counterpart of the agent's private key.
# For TESTING only. NOT used in any other environment
export NODE_STATUS_API_AGENT_KEY_LIST="H4z8kx5Kkf5JMQHhxaW1MwYndjKCDHC7HsVhHTFfBZ4J"
@@ -0,0 +1,61 @@
ALTER TABLE nym_nodes ADD COLUMN self_described TEXT;
ALTER TABLE nym_nodes ADD COLUMN bond_info TEXT;
-- # Why recreate tables?
-- I need DELETE with CASCADE functionality, but ALTER TABLE doesn't support
-- adding constraints (which CASCADE is). So I recreate tables with proper
-- constraints and fill them with existing data.
-- To avoid invalidating existing FK references, temporarily disable FK enforcement.
PRAGMA foreign_keys=off;
DROP INDEX IF EXISTS idx_nym_nodes_packet_stats_raw_node_id_timestamp_utc;
ALTER TABLE nym_node_descriptions RENAME TO _nym_node_descriptions_old;
ALTER TABLE nym_nodes_packet_stats_raw RENAME TO _nym_nodes_packet_stats_raw_old;
ALTER TABLE nym_node_daily_mixing_stats RENAME TO _nym_node_daily_mixing_stats_old;
CREATE TABLE nym_node_descriptions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
node_id INTEGER UNIQUE NOT NULL,
moniker VARCHAR,
website VARCHAR,
security_contact VARCHAR,
details VARCHAR,
last_updated_utc INTEGER NOT NULL,
FOREIGN KEY (node_id) REFERENCES nym_nodes (node_id) ON DELETE CASCADE
);
CREATE TABLE nym_nodes_packet_stats_raw (
id INTEGER PRIMARY KEY AUTOINCREMENT,
node_id INTEGER NOT NULL,
timestamp_utc INTEGER NOT NULL,
packets_received INTEGER,
packets_sent INTEGER,
packets_dropped INTEGER,
FOREIGN KEY (node_id) REFERENCES nym_nodes (node_id) ON DELETE CASCADE
);
CREATE INDEX idx_nym_nodes_packet_stats_raw_node_id_timestamp_utc ON nym_nodes_packet_stats_raw (node_id, timestamp_utc);
CREATE TABLE nym_node_daily_mixing_stats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
node_id INTEGER NOT NULL,
total_stake BIGINT NOT NULL,
date_utc VARCHAR NOT NULL,
packets_received INTEGER DEFAULT 0,
packets_sent INTEGER DEFAULT 0,
packets_dropped INTEGER DEFAULT 0,
FOREIGN KEY (node_id) REFERENCES nym_nodes (node_id) ON DELETE CASCADE,
UNIQUE (node_id, date_utc) -- This constraint automatically creates an index
);
INSERT INTO nym_node_descriptions SELECT * FROM _nym_node_descriptions_old;
INSERT INTO nym_nodes_packet_stats_raw SELECT * FROM _nym_nodes_packet_stats_raw_old;
INSERT INTO nym_node_daily_mixing_stats SELECT * FROM _nym_node_daily_mixing_stats_old;
DROP TABLE _nym_node_descriptions_old;
DROP TABLE _nym_nodes_packet_stats_raw_old;
DROP TABLE _nym_node_daily_mixing_stats_old;
PRAGMA foreign_keys=on;
@@ -1,5 +1,9 @@
use anyhow::{anyhow, Result};
use sqlx::{migrate::Migrator, sqlite::SqliteConnectOptions, ConnectOptions, SqlitePool};
use sqlx::{
migrate::Migrator,
sqlite::{SqliteAutoVacuum, SqliteConnectOptions, SqliteSynchronous},
ConnectOptions, SqlitePool,
};
use std::str::FromStr;
pub(crate) mod models;
@@ -16,6 +20,10 @@ pub(crate) struct Storage {
impl Storage {
pub async fn init(connection_url: String) -> Result<Self> {
let connect_options = SqliteConnectOptions::from_str(&connection_url)?
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.foreign_keys(true)
.create_if_missing(true)
.disable_statement_logging();
@@ -2,20 +2,33 @@ use std::str::FromStr;
use crate::{
http::{self, models::SummaryHistory},
utils::NumericalCheckedCast,
utils::{decimal_to_i64, NumericalCheckedCast},
};
use anyhow::Context;
use nym_contracts_common::Percent;
use nym_crypto::asymmetric::{ed25519, x25519};
use nym_network_defaults::DEFAULT_NYM_NODE_HTTP_PORT;
use nym_node_requests::api::v1::node::models::NodeDescription;
use nym_validator_client::nym_api::SkimmedNode;
use nym_validator_client::{
client::NymNodeDetails, models::NymNodeDescription, nym_api::SkimmedNode,
};
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use strum_macros::{EnumString, FromRepr};
use time::{Date, OffsetDateTime};
use utoipa::ToSchema;
macro_rules! serialize_opt_to_value {
($var:expr) => {{
match $var {
None => Ok(None),
Some(ref value) => serde_json::to_value(value).map(Some).map_err(|err| {
anyhow::anyhow!("Failed to serialize {}: {:?}", stringify!($var), err)
}),
}
}};
}
pub(crate) struct GatewayInsertRecord {
pub(crate) identity_key: String,
pub(crate) bonded: bool,
@@ -360,6 +373,7 @@ impl TryFrom<GatewaySessionsRecord> for http::models::SessionStats {
}
}
#[derive(strum_macros::Display)]
pub(crate) enum ScrapeNodeKind {
LegacyMixnode { mix_id: i64 },
MixingNymNode { node_id: i64 },
@@ -406,11 +420,11 @@ impl ScraperNodeInfo {
}
}
#[allow(dead_code)] // it's not dead code but clippy doesn't detect usage in sqlx macros
#[derive(sqlx::Decode, Debug)]
pub(crate) struct NymNodeDto {
pub node_id: i64,
pub ed25519_identity_pubkey: String,
#[allow(dead_code)] // it's not dead code but clippy doesn't detect usage in sqlx macros
pub total_stake: i64,
pub ip_addresses: serde_json::Value,
pub mix_port: i64,
@@ -419,8 +433,11 @@ pub(crate) struct NymNodeDto {
pub supported_roles: serde_json::Value,
pub entry: Option<serde_json::Value>,
pub performance: String,
pub self_described: Option<serde_json::Value>,
pub bond_info: Option<serde_json::Value>,
}
#[allow(dead_code)] // it's not dead code but clippy doesn't detect usage in sqlx macros
#[derive(Debug)]
pub(crate) struct NymNodeInsertRecord {
pub node_id: i64,
@@ -433,13 +450,27 @@ pub(crate) struct NymNodeInsertRecord {
pub supported_roles: serde_json::Value,
pub performance: String,
pub entry: Option<serde_json::Value>,
pub self_described: Option<serde_json::Value>,
pub bond_info: Option<serde_json::Value>,
pub last_updated_utc: String,
}
impl NymNodeInsertRecord {
pub fn new(skimmed_node: SkimmedNode, total_stake: i64) -> anyhow::Result<Self> {
pub fn new(
skimmed_node: SkimmedNode,
bond_info: Option<&NymNodeDetails>,
self_described: Option<&NymNodeDescription>,
) -> anyhow::Result<Self> {
let now = OffsetDateTime::now_utc().to_string();
// if bond info is missing, set stake to 0
let total_stake = bond_info
.map(|info| decimal_to_i64(info.total_stake()))
.unwrap_or(0);
let entry = serialize_opt_to_value!(skimmed_node.entry)?;
let bond_info = serialize_opt_to_value!(bond_info)?;
let self_described = serialize_opt_to_value!(self_described)?;
let record = Self {
node_id: skimmed_node.node_id.into(),
ed25519_identity_pubkey: skimmed_node.ed25519_identity_pubkey.to_base58_string(),
@@ -450,10 +481,9 @@ impl NymNodeInsertRecord {
node_role: serde_json::to_value(&skimmed_node.role)?,
supported_roles: serde_json::to_value(skimmed_node.supported_roles)?,
performance: skimmed_node.performance.value().to_string(),
entry: match skimmed_node.entry {
Some(entry) => Some(serde_json::to_value(entry)?),
None => None,
},
entry,
self_described,
bond_info,
last_updated_utc: now,
};
@@ -30,11 +30,22 @@ pub(crate) async fn select_gateway_identity(
Ok(record.gateway_identity_key)
}
pub(crate) async fn insert_gateways(
pub(crate) async fn update_bonded_gateways(
pool: &DbPool,
gateways: Vec<GatewayInsertRecord>,
) -> anyhow::Result<()> {
let mut db = pool.acquire().await?;
let mut tx = pool.begin().await?;
sqlx::query!(
r#"UPDATE
gateways
SET
bonded = false
"#,
)
.execute(&mut *tx)
.await?;
for record in gateways {
sqlx::query!(
"INSERT INTO gateways
@@ -55,10 +66,12 @@ pub(crate) async fn insert_gateways(
record.last_updated_utc,
record.performance
)
.execute(&mut *db)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(())
}
@@ -101,7 +114,7 @@ pub(crate) async fn get_all_gateways(pool: &DbPool) -> anyhow::Result<Vec<Gatewa
Ok(items)
}
pub(crate) async fn get_all_gateway_id_keys(pool: &DbPool) -> anyhow::Result<HashSet<String>> {
pub(crate) async fn get_bonded_gateway_id_keys(pool: &DbPool) -> anyhow::Result<HashSet<String>> {
let mut conn = pool.acquire().await?;
let items = sqlx::query!(
r#"
@@ -11,12 +11,24 @@ use crate::{
http::models::{DailyStats, Mixnode},
};
pub(crate) async fn insert_mixnodes(
pub(crate) async fn update_mixnodes(
pool: &DbPool,
mixnodes: Vec<MixnodeRecord>,
) -> anyhow::Result<()> {
let mut conn = pool.acquire().await?;
let mut tx = pool.begin().await?;
// mark all as unbonded
sqlx::query!(
r#"UPDATE
mixnodes
SET
bonded = false
"#,
)
.execute(&mut *tx)
.await?;
// existing nodes get updated on insert
for record in mixnodes.iter() {
// https://www.sqlite.org/lang_upsert.html
sqlx::query!(
@@ -43,10 +55,12 @@ pub(crate) async fn insert_mixnodes(
record.last_updated_utc,
record.is_dp_delegatee
)
.execute(&mut *conn)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(())
}
@@ -126,7 +140,7 @@ pub(crate) async fn get_daily_stats(pool: &DbPool) -> anyhow::Result<Vec<DailySt
Ok(items)
}
pub(crate) async fn get_all_mix_ids(pool: &DbPool) -> anyhow::Result<HashSet<i64>> {
pub(crate) async fn get_bonded_mix_ids(pool: &DbPool) -> anyhow::Result<HashSet<i64>> {
let mut conn = pool.acquire().await?;
let items = sqlx::query!(
r#"
@@ -9,12 +9,15 @@ mod summary;
pub(crate) mod testruns;
pub(crate) use gateways::{
get_all_gateway_id_keys, get_all_gateways, insert_gateways, select_gateway_identity,
get_all_gateways, get_bonded_gateway_id_keys, select_gateway_identity, update_bonded_gateways,
};
pub(crate) use gateways_stats::{delete_old_records, get_sessions_stats, insert_session_records};
pub(crate) use misc::insert_summaries;
pub(crate) use mixnodes::{get_all_mix_ids, get_all_mixnodes, get_daily_stats, insert_mixnodes};
pub(crate) use nym_nodes::{get_nym_nodes, insert_nym_nodes};
pub(crate) use mixnodes::{get_all_mixnodes, get_bonded_mix_ids, get_daily_stats, update_mixnodes};
pub(crate) use nym_nodes::{
get_active_nym_nodes, get_all_nym_nodes, get_described_node_bond_info, get_node_descriptions,
update_nym_nodes,
};
pub(crate) use packet_stats::{
get_raw_node_stats, insert_daily_node_stats, insert_node_packet_stats,
};
@@ -1,22 +1,21 @@
use std::collections::HashMap;
use anyhow::Context;
use futures_util::TryStreamExt;
use nym_validator_client::{client::NymNodeDetails, nym_api::SkimmedNode};
use nym_validator_client::{
client::{NodeId, NymNodeDetails},
models::NymNodeDescription,
};
use std::collections::{HashMap, HashSet};
use tracing::instrument;
use crate::{
db::{
models::{NymNodeDto, NymNodeInsertRecord},
DbPool,
},
utils::decimal_to_i64,
use crate::db::{
models::{NymNodeDto, NymNodeInsertRecord},
DbPool,
};
pub(crate) async fn get_nym_nodes(pool: &DbPool) -> anyhow::Result<Vec<SkimmedNode>> {
pub(crate) async fn get_all_nym_nodes(pool: &DbPool) -> anyhow::Result<Vec<NymNodeDto>> {
let mut conn = pool.acquire().await?;
let items = sqlx::query_as!(
sqlx::query_as!(
NymNodeDto,
r#"SELECT
node_id,
@@ -28,44 +27,77 @@ pub(crate) async fn get_nym_nodes(pool: &DbPool) -> anyhow::Result<Vec<SkimmedNo
node_role as "node_role: serde_json::Value",
supported_roles as "supported_roles: serde_json::Value",
entry as "entry: serde_json::Value",
performance
performance,
self_described as "self_described: serde_json::Value",
bond_info as "bond_info: serde_json::Value"
FROM
nym_nodes
"#,
)
.fetch(&mut *conn)
.try_collect::<Vec<NymNodeDto>>()
.await?;
let mut skimmed_nodes = Vec::new();
for item in items {
let node_id = item.node_id;
match SkimmedNode::try_from(item) {
Ok(node) => skimmed_nodes.push(node),
Err(e) => {
tracing::warn!("Failed to decode node_id={}: {}", node_id, e);
}
}
}
Ok(skimmed_nodes)
.await
.map_err(From::from)
}
#[instrument(level = "debug", skip_all)]
pub(crate) async fn insert_nym_nodes(
pool: &DbPool,
nym_nodes: Vec<SkimmedNode>,
bonded_node_info: &HashMap<u32, NymNodeDetails>,
) -> anyhow::Result<()> {
/// if a node doesn't expose its self-described endpoint, it can't route traffic
/// - https://nym.com/docs/operators/nodes/nym-node/bonding
///
/// same if it's not bonded in the mixnet smart contract
/// - https://nym.com/docs/operators/tokenomics/mixnet-rewards#rewarded-set-selection
pub(crate) async fn get_active_nym_nodes(pool: &DbPool) -> anyhow::Result<Vec<NymNodeDto>> {
let mut conn = pool.acquire().await?;
for nym_node in nym_nodes.into_iter() {
let total_stake = bonded_node_info
.get(&nym_node.node_id)
.map(|details| decimal_to_i64(details.total_stake()))
.unwrap_or(0);
sqlx::query_as!(
NymNodeDto,
r#"SELECT
node_id,
ed25519_identity_pubkey,
total_stake,
ip_addresses as "ip_addresses!: serde_json::Value",
mix_port,
x25519_sphinx_pubkey,
node_role as "node_role: serde_json::Value",
supported_roles as "supported_roles: serde_json::Value",
entry as "entry: serde_json::Value",
performance,
self_described as "self_described: serde_json::Value",
bond_info as "bond_info: serde_json::Value"
FROM
nym_nodes
WHERE
self_described IS NOT NULL
AND
bond_info IS NOT NULL
"#,
)
.fetch(&mut *conn)
.try_collect::<Vec<NymNodeDto>>()
.await
.map_err(From::from)
}
let record = NymNodeInsertRecord::new(nym_node, total_stake)?;
#[instrument(level = "debug", skip_all, fields(node_records=node_records.len()))]
pub(crate) async fn update_nym_nodes(
pool: &DbPool,
node_records: Vec<NymNodeInsertRecord>,
) -> anyhow::Result<usize> {
let mut tx = pool.begin().await?;
let mut nodes_to_delete = sqlx::query!(
r#"SELECT
node_id as "node_id!: i64"
FROM
nym_nodes
"#,
)
.fetch_all(&mut *tx)
.await
.map(|records| records.into_iter().map(|record| record.node_id as NodeId))?
.collect::<HashSet<NodeId>>();
let inserted = node_records.len();
for record in node_records {
// https://www.sqlite.org/lang_upsert.html
sqlx::query!(
"INSERT INTO nym_nodes
@@ -74,9 +106,11 @@ pub(crate) async fn insert_nym_nodes(
ip_addresses, mix_port,
x25519_sphinx_pubkey, node_role,
supported_roles, entry,
self_described,
bond_info,
performance, last_updated_utc
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(node_id) DO UPDATE SET
ed25519_identity_pubkey=excluded.ed25519_identity_pubkey,
ip_addresses=excluded.ip_addresses,
@@ -85,6 +119,8 @@ pub(crate) async fn insert_nym_nodes(
node_role=excluded.node_role,
supported_roles=excluded.supported_roles,
entry=excluded.entry,
self_described=excluded.self_described,
bond_info=excluded.bond_info,
performance=excluded.performance,
last_updated_utc=excluded.last_updated_utc
;",
@@ -97,13 +133,104 @@ pub(crate) async fn insert_nym_nodes(
record.node_role,
record.supported_roles,
record.entry,
record.self_described,
record.bond_info,
record.performance,
record.last_updated_utc,
)
.execute(&mut *conn)
.execute(&mut *tx)
.await
.with_context(|| format!("node_id={}", record.node_id))?;
.with_context(|| format!("Failed to INSERT node_id={}", record.node_id))?;
// if node was updated, remove it from the list
nodes_to_delete.remove(&(record.node_id as NodeId));
}
Ok(())
if !nodes_to_delete.is_empty() {
tracing::debug!("DELETING {} obsolete nodes", nodes_to_delete.len());
}
// clean up leftover nodes, which weren't inserted/updated
for node_id in nodes_to_delete {
sqlx::query!(
"DELETE FROM nym_nodes
WHERE node_id = ?",
node_id,
)
.execute(&mut *tx)
.await
.map_err(|e| anyhow::anyhow!("Failed to DELETE node_id={}: {}", node_id, e))?;
}
tx.commit().await?;
Ok(inserted)
}
pub(crate) async fn get_described_node_bond_info(
pool: &DbPool,
) -> anyhow::Result<HashMap<NodeId, NymNodeDetails>> {
let mut conn = pool.acquire().await?;
sqlx::query!(
r#"SELECT
node_id,
bond_info as "bond_info: serde_json::Value"
FROM
nym_nodes
WHERE
bond_info IS NOT NULL
AND
self_described IS NOT NULL
"#,
)
.fetch_all(&mut *conn)
.await
.map(|records| {
records
.into_iter()
.filter_map(|record| {
record
.bond_info
// only return details for nodes which have details stored
.and_then(|bond_info| serde_json::from_value::<NymNodeDetails>(bond_info).ok())
.map(|res| (record.node_id as NodeId, res))
})
.collect::<HashMap<_, _>>()
})
.map_err(From::from)
}
pub(crate) async fn get_node_descriptions(
pool: &DbPool,
) -> anyhow::Result<HashMap<NodeId, NymNodeDescription>> {
let mut conn = pool.acquire().await?;
sqlx::query!(
r#"SELECT
node_id,
self_described as "self_described: serde_json::Value"
FROM
nym_nodes
WHERE
self_described IS NOT NULL
"#,
)
.fetch_all(&mut *conn)
.await
.map(|records| {
records
.into_iter()
.filter_map(|record| {
record
.self_described
// only return details for nodes which have details stored
.and_then(|description| {
serde_json::from_value::<NymNodeDescription>(description).ok()
})
.map(|res| (record.node_id as NodeId, res))
})
.collect::<HashMap<_, _>>()
})
.map_err(From::from)
}
@@ -7,45 +7,56 @@ use crate::{
};
use anyhow::Result;
use chrono::Utc;
use nym_validator_client::nym_api::SkimmedNode;
pub(crate) async fn get_nodes_for_scraping(pool: &DbPool) -> Result<Vec<ScraperNodeInfo>> {
let mut nodes_to_scrape = Vec::new();
let mixnode_ids = queries::get_all_mix_ids(pool).await?;
let gateway_keys = queries::get_all_gateway_id_keys(pool).await?;
let mixnode_ids = queries::get_bonded_mix_ids(pool).await?;
let gateway_keys = queries::get_bonded_gateway_id_keys(pool).await?;
let mut entry_exit_nodes = 0;
queries::get_nym_nodes(pool)
.await?
.into_iter()
.for_each(|node| {
// due to polyfilling, Nym nodes table might contain legacy mixnodes
// as well. Mark them as such here.
let node_kind = if mixnode_ids.contains(&node.node_id.into()) {
ScrapeNodeKind::LegacyMixnode {
mix_id: node.node_id.into(),
let skimmed_nodes = queries::get_active_nym_nodes(pool).await.map(|nodes_dto| {
nodes_dto.into_iter().filter_map(|node| {
let node_id = node.node_id;
match SkimmedNode::try_from(node) {
Ok(node) => Some(node),
Err(e) => {
tracing::error!("Failed to decode node_id={}: {}", node_id, e);
None
}
} else if gateway_keys.contains(&node.ed25519_identity_pubkey.to_base58_string()) {
entry_exit_nodes += 1;
ScrapeNodeKind::EntryExitNymNode {
node_id: node.node_id.into(),
identity_key: node.ed25519_identity_pubkey.to_base58_string(),
}
} else {
ScrapeNodeKind::MixingNymNode {
node_id: node.node_id.into(),
}
};
nodes_to_scrape.push(ScraperNodeInfo {
node_kind,
hosts: node
.ip_addresses
.into_iter()
.map(|ip| ip.to_string())
.collect::<Vec<_>>(),
http_api_port: node.mix_port.into(),
})
});
}
})
})?;
skimmed_nodes.for_each(|node| {
// TODO: relies on polyfilling: Nym nodes table might contain legacy mixnodes
// as well. Categorize them here.
let node_kind = if mixnode_ids.contains(&node.node_id.into()) {
ScrapeNodeKind::LegacyMixnode {
mix_id: node.node_id.into(),
}
} else if gateway_keys.contains(&node.ed25519_identity_pubkey.to_base58_string()) {
entry_exit_nodes += 1;
ScrapeNodeKind::EntryExitNymNode {
node_id: node.node_id.into(),
identity_key: node.ed25519_identity_pubkey.to_base58_string(),
}
} else {
ScrapeNodeKind::MixingNymNode {
node_id: node.node_id.into(),
}
};
nodes_to_scrape.push(ScraperNodeInfo {
node_kind,
hosts: node
.ip_addresses
.into_iter()
.map(|ip| ip.to_string())
.collect::<Vec<_>>(),
http_api_port: node.mix_port.into(),
})
});
tracing::debug!("Fetched {} 🌟 total nym nodes", nodes_to_scrape.len());
tracing::debug!("Fetched {} 🚪 entry/exit nodes", entry_exit_nodes);
@@ -10,6 +10,7 @@ use crate::http::{server::HttpServer, state::AppState};
pub(crate) mod gateways;
pub(crate) mod metrics;
pub(crate) mod mixnodes;
pub(crate) mod nym_nodes;
pub(crate) mod services;
pub(crate) mod summary;
pub(crate) mod testruns;
@@ -38,6 +39,7 @@ impl RouterBuilder {
.nest("/summary", summary::routes())
.nest("/metrics", metrics::routes()),
)
.nest("/v3", Router::new().nest("/nym-nodes", nym_nodes::routes()))
.nest(
"/internal",
Router::new().nest("/testruns", testruns::routes()),
@@ -0,0 +1,42 @@
use axum::{
extract::{Query, State},
Json, Router,
};
use tracing::instrument;
use crate::http::{
error::{HttpError, HttpResult},
models::ExtendedNymNode,
state::AppState,
PagedResult, Pagination,
};
pub(crate) fn routes() -> Router<AppState> {
Router::new().route("/", axum::routing::get(nym_nodes))
}
#[utoipa::path(
tag = "Nym Nodes",
get,
params(
Pagination
),
path = "/v3/nym-nodes",
responses(
(status = 200, body = PagedResult<ExtendedNymNode>)
)
)]
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(page=pagination.page, size=pagination.size))]
async fn nym_nodes(
Query(pagination): Query<Pagination>,
State(state): State<AppState>,
) -> HttpResult<Json<PagedResult<ExtendedNymNode>>> {
let db = state.db_pool();
let nodes = state.cache().get_nym_nodes_list(db).await.map_err(|e| {
tracing::error!("{e}");
HttpError::internal()
})?;
Ok(Json(PagedResult::paginate(pagination, nodes)))
}
@@ -1,4 +1,6 @@
use cosmwasm_std::Decimal;
use nym_node_requests::api::v1::node::models::NodeDescription;
use nym_validator_client::client::NodeId;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
@@ -45,6 +47,23 @@ pub struct Mixnode {
pub last_updated_utc: String,
}
#[derive(Clone, Debug, utoipa::ToSchema, Deserialize, Serialize)]
pub(crate) struct ExtendedNymNode {
pub(crate) node_id: NodeId,
pub(crate) identity_key: String,
pub(crate) uptime: f64,
#[schema(value_type = String)]
pub(crate) total_stake: Decimal,
pub(crate) original_pledge: u128,
pub(crate) bonding_address: String,
pub(crate) bonded: bool,
pub(crate) node_type: String,
pub(crate) ip_address: String,
pub(crate) accepted_tnc: bool,
pub(crate) description: serde_json::Value,
pub(crate) rewarding_details: serde_json::Value,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct DailyStats {
pub date_utc: String,
@@ -1,12 +1,16 @@
use std::{sync::Arc, time::Duration};
use std::{collections::HashMap, sync::Arc, time::Duration};
use cosmwasm_std::Decimal;
use moka::{future::Cache, Entry};
use nym_contracts_common::NaiveFloat;
use nym_crypto::asymmetric::ed25519::PublicKey;
use nym_validator_client::{models::DescribedNodeType, nym_api::SkimmedNode};
use tokio::sync::RwLock;
use tracing::instrument;
use crate::{
db::DbPool,
http::models::{DailyStats, Gateway, Mixnode, SummaryHistory},
db::{queries, DbPool},
http::models::{DailyStats, ExtendedNymNode, Gateway, Mixnode, SummaryHistory},
};
use super::models::SessionStats;
@@ -53,6 +57,7 @@ impl AppState {
static GATEWAYS_LIST_KEY: &str = "gateways";
static MIXNODES_LIST_KEY: &str = "mixnodes";
static NYM_NODES_LIST_KEY: &str = "nym_nodes";
static MIXSTATS_LIST_KEY: &str = "mixstats";
static SUMMARY_HISTORY_LIST_KEY: &str = "summary-history";
static SESSION_STATS_LIST_KEY: &str = "session-stats";
@@ -63,6 +68,7 @@ const MIXNODE_STATS_HISTORY_DAYS: usize = 30;
pub(crate) struct HttpCache {
gateways: Cache<String, Arc<RwLock<Vec<Gateway>>>>,
mixnodes: Cache<String, Arc<RwLock<Vec<Mixnode>>>>,
nym_nodes: Cache<String, Arc<RwLock<Vec<ExtendedNymNode>>>>,
mixstats: Cache<String, Arc<RwLock<Vec<DailyStats>>>>,
history: Cache<String, Arc<RwLock<Vec<SummaryHistory>>>>,
session_stats: Cache<String, Arc<RwLock<Vec<SessionStats>>>>,
@@ -79,6 +85,10 @@ impl HttpCache {
.max_capacity(2)
.time_to_live(Duration::from_secs(ttl_seconds))
.build(),
nym_nodes: Cache::builder()
.max_capacity(2)
.time_to_live(Duration::from_secs(ttl_seconds))
.build(),
mixstats: Cache::builder()
.max_capacity(2)
.time_to_live(Duration::from_secs(ttl_seconds))
@@ -181,6 +191,47 @@ impl HttpCache {
}
}
pub async fn upsert_nym_node_list(
&self,
nym_node_list: Vec<ExtendedNymNode>,
) -> Entry<String, Arc<RwLock<Vec<ExtendedNymNode>>>> {
self.nym_nodes
.entry_by_ref(NYM_NODES_LIST_KEY)
.and_upsert_with(|maybe_entry| async {
if let Some(entry) = maybe_entry {
let v = entry.into_value();
let mut guard = v.write().await;
*guard = nym_node_list;
v.clone()
} else {
Arc::new(RwLock::new(nym_node_list))
}
})
.await
}
pub async fn get_nym_nodes_list(&self, db: &DbPool) -> anyhow::Result<Vec<ExtendedNymNode>> {
match self.nym_nodes.get(NYM_NODES_LIST_KEY).await {
Some(guard) => {
tracing::trace!("Fetching from cache...");
let read_lock = guard.read().await;
Ok(read_lock.clone())
}
None => {
tracing::trace!("No nym nodes in cache, refreshing cache from DB...");
let nym_nodes = aggregate_node_info_from_db(db).await.inspect(|nym_nodes| {
if nym_nodes.is_empty() {
tracing::warn!("Database contains 0 nym nodes");
}
})?;
self.upsert_nym_node_list(nym_nodes.clone()).await;
Ok(nym_nodes)
}
}
}
pub async fn upsert_mixnode_stats(
&self,
mixnode_stats: Vec<DailyStats>,
@@ -293,3 +344,81 @@ impl HttpCache {
.await
}
}
#[instrument(level = "info", skip_all)]
async fn aggregate_node_info_from_db(pool: &DbPool) -> anyhow::Result<Vec<ExtendedNymNode>> {
let node_bond_info = queries::get_described_node_bond_info(pool).await?;
tracing::debug!("Described nodes with bond info: {}", node_bond_info.len());
let skimmed_nodes = queries::get_all_nym_nodes(pool).await.map(|records| {
records
.into_iter()
.filter_map(|dto| SkimmedNode::try_from(dto).ok())
.map(|skimmed_node| (skimmed_node.node_id, skimmed_node))
.collect::<HashMap<_, _>>()
})?;
tracing::debug!("Skimmed nodes: {}", skimmed_nodes.len());
let described_nodes = queries::get_node_descriptions(pool).await?;
tracing::debug!("Described nodes: {}", described_nodes.len());
let mut parsed_nym_nodes = Vec::new();
for (node_id, described_node) in described_nodes {
let bond_details = node_bond_info.get(&node_id);
let bonded = bond_details.is_some();
let total_stake = bond_details
.map(|details| details.total_stake())
.unwrap_or(Decimal::zero());
let identity_key = described_node.ed25519_identity_key().to_string();
let original_pledge = bond_details
.map(|details| details.original_pledge().amount.u128())
.unwrap_or(0u128);
let rewarding_details = &node_bond_info
.get(&node_id)
.map(|details| details.rewarding_details.clone());
let uptime = skimmed_nodes
.get(&node_id)
.map(|node| node.performance.naive_to_f64())
.unwrap_or(0.0);
let node_type = match described_node.contract_node_type {
DescribedNodeType::NymNode => "nym_node".to_string(),
DescribedNodeType::LegacyMixnode => "legacy_mixnode".to_string(),
DescribedNodeType::LegacyGateway => "legacy_gateway".to_string(),
};
let ip_address = described_node
.description
.host_information
.ip_address
.first()
.map(ToString::to_string)
.unwrap_or_default();
let accepted_tnc = described_node
.description
.auxiliary_details
.accepted_operator_terms_and_conditions;
let description = described_node.description;
let bonding_address = bond_details
.map(|details| details.bond_information.owner.to_string())
.unwrap_or_default();
parsed_nym_nodes.push(ExtendedNymNode {
node_id,
identity_key,
total_stake,
uptime,
ip_address,
original_pledge,
bonding_address,
bonded,
node_type,
accepted_tnc,
description: serde_json::to_value(description).unwrap_or_default(),
rewarding_details: serde_json::to_value(rewarding_details).unwrap_or_default(),
});
}
Ok(parsed_nym_nodes)
}
@@ -134,8 +134,9 @@ impl Scraper {
node.node_id()
),
Err(e) => debug!(
"📝 ❌ Description task #{} for node {} failed: {}",
"📝 ❌ Description task #{} for {} {} failed: {}",
task_id,
node.node_kind,
node.node_id(),
e
),
@@ -178,8 +179,9 @@ impl Scraper {
node.node_id()
),
Err(e) => debug!(
"📊 ❌ Packet stats task #{} for node {} failed: {}",
"📊 ❌ Packet stats task #{} for {} {} failed: {}",
task_id,
node.node_kind,
node.node_id(),
e
),
@@ -1,9 +1,10 @@
#![allow(deprecated)]
use crate::db::models::{
gateway, mixnode, GatewayInsertRecord, MixnodeRecord, NetworkSummary, ASSIGNED_ENTRY_COUNT,
ASSIGNED_EXIT_COUNT, ASSIGNED_MIXING_COUNT, GATEWAYS_BONDED_COUNT, GATEWAYS_HISTORICAL_COUNT,
MIXNODES_HISTORICAL_COUNT, MIXNODES_LEGACY_COUNT, NYMNODES_DESCRIBED_COUNT, NYMNODE_COUNT,
gateway, mixnode, GatewayInsertRecord, MixnodeRecord, NetworkSummary, NymNodeInsertRecord,
ASSIGNED_ENTRY_COUNT, ASSIGNED_EXIT_COUNT, ASSIGNED_MIXING_COUNT, GATEWAYS_BONDED_COUNT,
GATEWAYS_HISTORICAL_COUNT, MIXNODES_HISTORICAL_COUNT, MIXNODES_LEGACY_COUNT,
NYMNODES_DESCRIBED_COUNT, NYMNODE_COUNT,
};
use crate::db::{queries, DbPool};
use crate::monitor::geodata::{Location, NodeGeoData};
@@ -11,7 +12,7 @@ use crate::utils::{decimal_to_i64, LogError, NumericalCheckedCast};
use anyhow::anyhow;
use moka::future::Cache;
use nym_network_defaults::NymNetworkDetails;
use nym_validator_client::client::{NodeId, NymApiClientExt};
use nym_validator_client::client::{NodeId, NymApiClientExt, NymNodeDetails};
use nym_validator_client::models::{
LegacyDescribedMixNode, MixNodeBondAnnotated, NymNodeDescription,
};
@@ -107,19 +108,24 @@ impl Monitor {
let described_nodes = api_client
.get_all_described_nodes()
.await
.log_error("get_all_described_nodes")?;
.log_error("get_all_described_nodes")?
.into_iter()
.map(|elem| (elem.node_id, elem))
.collect::<HashMap<_, _>>();
tracing::info!("🟣 described nodes: {}", described_nodes.len());
let gateways = described_nodes
.iter()
.filter(|node| {
node.description.declared_role.entry
|| node.description.declared_role.exit_ipr
|| node.description.declared_role.exit_nr
.filter_map(|(_, node)| {
if node.description.declared_role.entry || node.description.declared_role.exit_ipr {
Some(node)
} else {
None
}
})
.collect::<Vec<_>>();
let bonded_node_info = api_client
let bonded_nym_nodes = api_client
.get_all_bonded_nym_nodes()
.await?
.into_iter()
@@ -127,22 +133,27 @@ impl Monitor {
// for faster reads
.collect::<HashMap<_, _>>();
tracing::info!("🟣 bonded_nodes: {}", bonded_node_info.len());
tracing::info!("🟣 bonded_nodes: {}", bonded_nym_nodes.len());
// returns only bonded nodes
let nym_nodes = api_client
.get_all_basic_nodes()
.await
.log_error("get_all_basic_nodes")?;
queries::insert_nym_nodes(&self.db_pool, nym_nodes.clone(), &bonded_node_info)
tracing::info!("🟣 get_all_basic_nodes: {}", nym_nodes.len());
let nym_node_records =
self.prepare_nym_node_data(nym_nodes.clone(), &bonded_nym_nodes, &described_nodes);
queries::update_nym_nodes(&self.db_pool, nym_node_records)
.await
.map(|_| {
tracing::debug!("{} nym nodes written to DB!", nym_nodes.len());
.map(|inserted| {
tracing::debug!("{} nym nodes written to DB!", inserted);
})?;
let mut gateway_geodata = Vec::new();
for gateway in gateways.iter() {
if let Some(node_details) = bonded_node_info.get(&gateway.node_id) {
if let Some(node_details) = bonded_nym_nodes.get(&gateway.node_id) {
let bond_info = &node_details.bond_information;
let gw_geodata = NodeGeoData {
identity_key: bond_info.node.identity_key.to_owned(),
@@ -170,6 +181,7 @@ impl Monitor {
.map(|elem| elem.identity_key().to_owned())
.collect::<HashSet<_>>();
// TODO this assumes polyfilling of legacy mixnodes on Nym API
let mixnodes_legacy = nym_nodes
.iter()
.filter(|node| {
@@ -208,11 +220,12 @@ impl Monitor {
let assigned_mixing_count = mixing_assigned_nodes.len();
let count_legacy_mixnodes = mixnodes_legacy.len();
let gateway_records = self.prepare_gateway_data(&gateways, gateway_geodata, &nym_nodes)?;
let gateway_records =
self.prepare_gateway_data(&gateways, gateway_geodata, &nym_nodes, &bonded_nym_nodes)?;
let pool = self.db_pool.clone();
let gateways_count = gateway_records.len();
queries::insert_gateways(&pool, gateway_records)
queries::update_bonded_gateways(&pool, gateway_records)
.await
.map(|_| {
tracing::debug!("{} gateway records written to DB!", gateways_count);
@@ -224,7 +237,7 @@ impl Monitor {
delegation_program_members,
)?;
let mixnodes_count = mixnode_records.len();
queries::insert_mixnodes(&pool, mixnode_records)
queries::update_mixnodes(&pool, mixnode_records)
.await
.map(|_| {
tracing::debug!("{} mixnode info written to DB!", mixnodes_count);
@@ -317,17 +330,45 @@ impl Monitor {
}
}
fn prepare_nym_node_data(
&self,
skimmed_nodes: Vec<SkimmedNode>,
bonded_node_info: &HashMap<NodeId, NymNodeDetails>,
described_nodes: &HashMap<NodeId, NymNodeDescription>,
) -> Vec<NymNodeInsertRecord> {
skimmed_nodes
.into_iter()
.filter_map(|skimmed_node| {
let node_id = skimmed_node.node_id;
let bond_info = bonded_node_info.get(&skimmed_node.node_id);
let self_described = described_nodes.get(&skimmed_node.node_id);
match NymNodeInsertRecord::new(skimmed_node, bond_info, self_described) {
Ok(record) => Some(record),
Err(err) => {
tracing::error!(
"Failed to create insert record for node {}: {}",
node_id,
err
);
None
}
}
})
.collect::<Vec<_>>()
}
fn prepare_gateway_data(
&self,
described_gateways: &[&NymNodeDescription],
gateway_geodata: Vec<NodeGeoData>,
skimmed_gateways: &[SkimmedNode],
bonded_nodes: &HashMap<NodeId, NymNodeDetails>,
) -> anyhow::Result<Vec<GatewayInsertRecord>> {
let mut gateway_records = Vec::new();
for gateway in described_gateways {
let identity_key = gateway.ed25519_identity_key().to_base58_string();
let bonded = true;
let bonded = bonded_nodes.contains_key(&gateway.node_id);
let last_updated_utc = chrono::offset::Utc::now().timestamp();
let self_described = serde_json::to_string(&gateway.description)?;
@@ -373,6 +414,7 @@ impl Monitor {
for mixnode in mixnodes {
let mix_id = mixnode.mix_id();
let identity_key = mixnode.identity_key();
// only bonded nodes are given to this function
let bonded = true;
let total_stake = decimal_to_i64(mixnode.mixnode_details.total_stake());
let node_info = mixnode.mix_node();
@@ -24,6 +24,7 @@ pub(crate) async fn try_queue_testrun(
explorer_pretty_bond as "explorer_pretty_bond?"
FROM gateways
WHERE gateway_identity_key = ?
AND bonded = true
ORDER BY gateway_identity_key
LIMIT 1"#,
identity_key,