node families (#6715)

* start node families topic branch

* start node families topic branch

* initialise node families contract

* define contract storage

* registering new family in storage

* accepting family invitation

* add_pending_invitation

* revoke_pending_invitation

* remove_family_member

* reject_pending_invitation

* disband_family

* added unit tests for the storage methods

* added restriction on uniquness of family names

* update rustc version for node families contract common

* clippy

* basic queries by id

* query_families_paged

* change family membership storage and expose query for all members of a family

* queries for pending invitations

* queries for past invitations

* queries for past data per node

* queries for past family members

* query_past_members_for_node_paged

* queries for family by name and by owner

* fixup family name normalisation

* fixed incorrect lower bound for queries for past data

* implement contract and storage initialisation

* stubbing tx messages that are to be exposed by the contract

* handler for updating config

* removed partial fee return

* wip: create family

* move mixnet contract interaction traits to shared location

* store original family name alongside the normalised variant

* prevent family creation if owner has a node in another family

* try_disband_family

* try_invite_to_family + shared helpers

* try_revoke_family_invitation

* accept_family_invitation

* stub method for node unbonding

* try_reject_family_invitation

* unit tests for family name normalisation

* try_leave_family

* try_kick_from_family

* fix outdated comments and add paid fee event attribute

* feat: NMv3: leave family upon node unbonding

* NF contract handling of unbonding

* lints

* init node families contract when creating performance contract tester

* clippy

* avoid self-dep in the contract dev deps

* introduced client traits for interacting with the node families contract

* add node families contract to cache refresher

* added query for all node family members (globally) and started scaffolding nym-api caches

* docs and cache -> api conversion

* calculating average node age based on individual timestamps

* wire up node families cache

* http stubs

* filled in the implementation

* route tests + extracting shared code

* review fixes

* feat: expose family information for all dvpn gateway endpoints within NS API

* expose family information for explorer v3 route

* clippy

* review comments and optimise db family update

* feat: Node Families: expose stake information inside DVpnGateway

* chore: update lock files after rebase

* chore: sort workspace members

* explicitly require providing node families contract address for mixnet contract migration

* fix missing node families contract address env export

* dont swallow cache overwrite failures in fixture

* pin network-defaults rustc version due to contracts dep

* further version pinning

* chore: update mixnet contract schema
This commit is contained in:
Jędrzej Stuczyński
2026-05-19 10:36:20 +01:00
committed by GitHub
parent 362f84b5f6
commit a21a01cf1a
139 changed files with 17916 additions and 1448 deletions
@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "SELECT\n nd.node_id,\n moniker,\n website,\n security_contact,\n details\n FROM\n nym_node_descriptions nd\n INNER JOIN\n nym_nodes nn on nd.node_id = nn.node_id\n WHERE\n bond_info IS NOT NULL\n ",
"query": "SELECT\n nd.node_id,\n moniker,\n website,\n security_contact,\n details\n FROM\n nym_node_descriptions nd\n INNER JOIN\n nym_nodes nn on nd.node_id = nn.node_id\n WHERE\n bond_info IS NOT NULL\n ",
"describe": {
"columns": [
{
@@ -40,5 +40,5 @@
true
]
},
"hash": "56854f703321ff8d8f30628c7e5322024ea01b778ab55efa9c7c6b219ef36308"
"hash": "0d183e6ad37527d0f624580f702b56cdae6d3fe6b53a8ed5c37fdc4d9e3b7f7c"
}
@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "SELECT\n gw.gateway_identity_key as \"gateway_identity_key!\",\n gw.bonded as \"bonded: bool\",\n gw.performance as \"performance!\",\n gw.self_described as \"self_described?\",\n gw.explorer_pretty_bond as \"explorer_pretty_bond?\",\n gw.last_probe_result as \"last_probe_result?\",\n gw.last_probe_log as \"last_probe_log?\",\n gw.last_testrun_utc as \"last_testrun_utc?\",\n gw.last_updated_utc as \"last_updated_utc!\",\n gw.bridges as \"bridges?: serde_json::Value\",\n COALESCE(gd.moniker, 'NA') as \"moniker!\",\n COALESCE(gd.website, 'NA') as \"website!\",\n COALESCE(gd.security_contact, 'NA') as \"security_contact!\",\n COALESCE(gd.details, 'NA') as \"details!\"\n FROM gateways gw\n LEFT JOIN gateway_description gd\n ON gw.gateway_identity_key = gd.gateway_identity_key\n ORDER BY gw.gateway_identity_key",
"query": "SELECT\n gw.gateway_identity_key as \"gateway_identity_key!\",\n gw.bonded as \"bonded: bool\",\n gw.performance as \"performance!\",\n gw.self_described as \"self_described?\",\n gw.explorer_pretty_bond as \"explorer_pretty_bond?\",\n gw.last_probe_result as \"last_probe_result?\",\n gw.last_probe_log as \"last_probe_log?\",\n gw.last_testrun_utc as \"last_testrun_utc?\",\n gw.last_updated_utc as \"last_updated_utc!\",\n gw.bridges as \"bridges?: serde_json::Value\",\n COALESCE(gd.moniker, 'NA') as \"moniker!\",\n COALESCE(gd.website, 'NA') as \"website!\",\n COALESCE(gd.security_contact, 'NA') as \"security_contact!\",\n COALESCE(gd.details, 'NA') as \"details!\"\n FROM gateways gw\n LEFT JOIN gateway_description gd\n ON gw.gateway_identity_key = gd.gateway_identity_key\n ORDER BY gw.gateway_identity_key",
"describe": {
"columns": [
{
@@ -94,5 +94,5 @@
null
]
},
"hash": "f25a4eb90c11957669cfad4800d7a0a384b672077abb123c3437962def194e8a"
"hash": "1668f5a5a0abc8a73454953c3f5b61d2afb1b37720f5756b9c6fb3aef55a3027"
}
@@ -0,0 +1,56 @@
{
"db_name": "PostgreSQL",
"query": "SELECT\n family_id,\n name,\n description,\n owner,\n family_stake_unym,\n members_count,\n created_at\n FROM node_families\n ORDER BY family_id",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "family_id",
"type_info": "Int8"
},
{
"ordinal": 1,
"name": "name",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "description",
"type_info": "Text"
},
{
"ordinal": 3,
"name": "owner",
"type_info": "Text"
},
{
"ordinal": 4,
"name": "family_stake_unym",
"type_info": "Int8"
},
{
"ordinal": 5,
"name": "members_count",
"type_info": "Int4"
},
{
"ordinal": 6,
"name": "created_at",
"type_info": "Int8"
}
],
"parameters": {
"Left": []
},
"nullable": [
false,
false,
false,
false,
true,
false,
false
]
},
"hash": "3120dc00831742e0981bdcf486a47c8d9e063f183c6da77295529b06495f6a5c"
}
@@ -0,0 +1,26 @@
{
"db_name": "PostgreSQL",
"query": "SELECT node_id, family_id\n FROM node_family_members",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "node_id",
"type_info": "Int8"
},
{
"ordinal": 1,
"name": "family_id",
"type_info": "Int8"
}
],
"parameters": {
"Left": []
},
"nullable": [
false,
false
]
},
"hash": "33e2ca569a31fd03fdc0b2cbf7840572c15063457bac682e938bee632bd0260f"
}
@@ -0,0 +1,21 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO node_families\n (family_id, name, description, owner, family_stake_unym, members_count, created_at, last_updated_utc)\n SELECT * FROM UNNEST(\n $1::BIGINT[], $2::TEXT[], $3::TEXT[], $4::TEXT[],\n $5::BIGINT[], $6::INTEGER[], $7::BIGINT[], $8::BIGINT[]\n )",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8Array",
"TextArray",
"TextArray",
"TextArray",
"Int8Array",
"Int4Array",
"Int8Array",
"Int8Array"
]
},
"nullable": []
},
"hash": "486bbf89adfd9343549263e84a1fce8994aa1d0b4ef4078a98f5a3efa4c70493"
}
@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "SELECT\n node_id,\n bond_info as \"bond_info: serde_json::Value\"\n FROM\n nym_nodes\n WHERE\n bond_info IS NOT NULL\n AND\n self_described IS NOT NULL\n ",
"query": "SELECT\n node_id,\n bond_info as \"bond_info: serde_json::Value\"\n FROM\n nym_nodes\n WHERE\n bond_info IS NOT NULL\n AND\n self_described IS NOT NULL\n ",
"describe": {
"columns": [
{
@@ -22,5 +22,5 @@
true
]
},
"hash": "227539374e7473f6f9642289c5b5d1bcd636315ab23537cb5f6d2f82a2bcb7bf"
"hash": "5834dcc61f5409f568bad0484395b565b2e258f66f4981ace2cfb36bd8b89991"
}
@@ -0,0 +1,16 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO node_family_members (node_id, family_id, joined_at)\n SELECT * FROM UNNEST($1::BIGINT[], $2::BIGINT[], $3::BIGINT[])",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8Array",
"Int8Array",
"Int8Array"
]
},
"nullable": []
},
"hash": "8c94f3c3bf7b9c3b135dc4b12303419b43304c3c93389abfc4c1d53955b15dd0"
}
@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "SELECT\n node_id,\n self_described as \"self_described: serde_json::Value\"\n FROM\n nym_nodes\n WHERE\n self_described IS NOT NULL\n ORDER BY\n node_id\n ",
"query": "SELECT\n node_id,\n self_described as \"self_described: serde_json::Value\"\n FROM\n nym_nodes\n WHERE\n self_described IS NOT NULL\n ORDER BY\n node_id\n ",
"describe": {
"columns": [
{
@@ -22,5 +22,5 @@
true
]
},
"hash": "c7656b2b1b4328415772ce69d0568bd5438d6c8496ca9cbdcfb70bb5375b345e"
"hash": "a1bb7a40d073335db7d9061771dfe62b0d2cfa2f7404f4afb8c3d393f0927f04"
}
@@ -0,0 +1,12 @@
{
"db_name": "PostgreSQL",
"query": "DELETE FROM node_families",
"describe": {
"columns": [],
"parameters": {
"Left": []
},
"nullable": []
},
"hash": "a470feaded5ceee869202ab3a8bd0122403d252c369ebbee7b64cba8eacc4e05"
}
@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "SELECT\n node_id,\n ed25519_identity_pubkey,\n total_stake,\n ip_addresses as \"ip_addresses!: serde_json::Value\",\n mix_port,\n x25519_sphinx_pubkey,\n node_role as \"node_role: serde_json::Value\",\n supported_roles as \"supported_roles: serde_json::Value\",\n entry as \"entry: serde_json::Value\",\n performance,\n self_described as \"self_described: serde_json::Value\",\n bond_info as \"bond_info: serde_json::Value\",\n http_api_port\n FROM\n nym_nodes\n ORDER BY\n node_id\n ",
"query": "SELECT\n node_id,\n ed25519_identity_pubkey,\n total_stake,\n ip_addresses as \"ip_addresses!: serde_json::Value\",\n mix_port,\n x25519_sphinx_pubkey,\n node_role as \"node_role: serde_json::Value\",\n supported_roles as \"supported_roles: serde_json::Value\",\n entry as \"entry: serde_json::Value\",\n performance,\n self_described as \"self_described: serde_json::Value\",\n bond_info as \"bond_info: serde_json::Value\",\n http_api_port\n FROM\n nym_nodes\n WHERE\n self_described IS NOT NULL\n AND\n bond_info IS NOT NULL\n ",
"describe": {
"columns": [
{
@@ -88,5 +88,5 @@
true
]
},
"hash": "3ddc12cc4e1796b787a50c40560d2bd71d1cfe5f5265e6f161b3122d1317a421"
"hash": "e5c75bbade89f9b1b7ddaad50f48be0cb679568a751c1b7e604a098a824b2a7c"
}
@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "SELECT\n node_id,\n ed25519_identity_pubkey,\n total_stake,\n ip_addresses as \"ip_addresses!: serde_json::Value\",\n mix_port,\n x25519_sphinx_pubkey,\n node_role as \"node_role: serde_json::Value\",\n supported_roles as \"supported_roles: serde_json::Value\",\n entry as \"entry: serde_json::Value\",\n performance,\n self_described as \"self_described: serde_json::Value\",\n bond_info as \"bond_info: serde_json::Value\",\n http_api_port\n FROM\n nym_nodes\n WHERE\n self_described IS NOT NULL\n AND\n bond_info IS NOT NULL\n ",
"query": "SELECT\n node_id,\n ed25519_identity_pubkey,\n total_stake,\n ip_addresses as \"ip_addresses!: serde_json::Value\",\n mix_port,\n x25519_sphinx_pubkey,\n node_role as \"node_role: serde_json::Value\",\n supported_roles as \"supported_roles: serde_json::Value\",\n entry as \"entry: serde_json::Value\",\n performance,\n self_described as \"self_described: serde_json::Value\",\n bond_info as \"bond_info: serde_json::Value\",\n http_api_port\n FROM\n nym_nodes\n ORDER BY\n node_id\n ",
"describe": {
"columns": [
{
@@ -88,5 +88,5 @@
true
]
},
"hash": "0b51df277ed66c6553f66af9b135342dee177abc1c92e4a89147de3c22d3d1a5"
"hash": "f2d5127e36621cb4dd3a7870e95447ee2ca1d3830ee0cc2237238f119ac50480"
}
@@ -40,6 +40,7 @@ nym-http-api-common = { workspace = true, features = ["middleware"] }
nym-network-defaults = { workspace = true }
nym-serde-helpers = { workspace = true }
nym-statistics-common = { workspace = true }
nym-api-requests = { workspace = true }
nym-validator-client = { workspace = true }
nym-task = { workspace = true }
nym-node-requests = { workspace = true, features = ["openapi", "client"] }
@@ -0,0 +1,24 @@
/*
* Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
* SPDX-License-Identifier: GPL-3.0-only
*/
CREATE TABLE IF NOT EXISTS node_families (
family_id BIGINT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT NOT NULL,
owner TEXT NOT NULL,
family_stake_unym BIGINT,
members_count INTEGER NOT NULL,
created_at BIGINT NOT NULL,
last_updated_utc BIGINT NOT NULL
);
CREATE TABLE IF NOT EXISTS node_family_members (
node_id BIGINT PRIMARY KEY,
family_id BIGINT NOT NULL REFERENCES node_families (family_id) ON DELETE CASCADE,
joined_at BIGINT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_node_family_members_family_id
ON node_family_members (family_id);
@@ -71,4 +71,21 @@ impl Storage {
pub fn pool_owned(&self) -> DbPool {
self.pool.clone()
}
pub fn pool(&self) -> &DbPool {
&self.pool
}
/// Build a `Storage` view from an existing pool. The wrapped pool is
/// `Clone`-cheap (shares the same underlying connection set), so this is
/// effectively free.
pub(crate) fn from_pool(pool: DbPool) -> Self {
pool.into()
}
}
impl From<DbPool> for Storage {
fn from(pool: DbPool) -> Self {
Storage { pool }
}
}
@@ -672,3 +672,48 @@ pub struct HostKeysDeHelper {
#[serde(default)]
pub x25519_versioned_noise: Option<VersionedNoiseKeyV1>,
}
// ---- node families ----
// it's not dead code but clippy doesn't detect usage in sqlx macros
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub(crate) struct NodeFamilyInsertRecord {
pub family_id: i64,
pub name: String,
pub description: String,
pub owner: String,
pub family_stake_unym: Option<i64>,
/// Member count as reported by nym-api; stored denormalised so callers
/// don't have to aggregate over `node_family_members`.
pub members_count: i32,
pub created_at: i64,
pub last_updated_utc: i64,
pub members: Vec<NodeFamilyMemberInsertRecord>,
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub(crate) struct NodeFamilyMemberInsertRecord {
pub node_id: i64,
pub joined_at: i64,
}
#[allow(dead_code)]
#[derive(Debug, Clone, FromRow)]
pub(crate) struct NodeFamilyDto {
pub family_id: i64,
pub name: String,
pub description: String,
pub owner: String,
pub family_stake_unym: Option<i64>,
pub members_count: i32,
pub created_at: i64,
}
#[allow(dead_code)]
#[derive(Debug, Clone, FromRow)]
pub(crate) struct NodeFamilyMemberDto {
pub node_id: i64,
pub family_id: i64,
}
@@ -2,7 +2,7 @@ use std::collections::HashSet;
use crate::{
db::{
DbConnection, DbPool,
DbConnection, DbPool, Storage,
models::{GatewayDto, GatewayInsertRecord},
},
http::models::Gateway,
@@ -30,25 +30,26 @@ pub(crate) async fn select_gateway_identity(
Ok(record.gateway_identity_key)
}
pub(crate) async fn update_bonded_gateways(
pool: &DbPool,
gateways: Vec<GatewayInsertRecord>,
) -> anyhow::Result<()> {
let mut tx = pool.begin().await?;
impl Storage {
pub(crate) async fn update_bonded_gateways(
&self,
gateways: Vec<GatewayInsertRecord>,
) -> anyhow::Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query!(
r#"UPDATE
sqlx::query!(
r#"UPDATE
gateways
SET
bonded = false
"#,
)
.execute(&mut *tx)
.await?;
)
.execute(&mut *tx)
.await?;
for record in gateways {
sqlx::query!(
"INSERT INTO gateways
for record in gateways {
sqlx::query!(
"INSERT INTO gateways
(gateway_identity_key, bonded,
self_described, explorer_pretty_bond,
last_updated_utc, performance)
@@ -59,57 +60,57 @@ pub(crate) async fn update_bonded_gateways(
explorer_pretty_bond=excluded.explorer_pretty_bond,
last_updated_utc=excluded.last_updated_utc,
performance = excluded.performance;",
record.identity_key,
record.bonded,
record.self_described,
record.explorer_pretty_bond,
record.last_updated_utc,
record.performance as i32
)
.execute(&mut *tx)
.await?;
record.identity_key,
record.bonded,
record.self_described,
record.explorer_pretty_bond,
record.last_updated_utc,
record.performance as i32
)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(())
}
tx.commit().await?;
pub(crate) async fn get_all_gateways(&self) -> anyhow::Result<Vec<Gateway>> {
let items = sqlx::query_as!(
GatewayDto,
r#"SELECT
gw.gateway_identity_key as "gateway_identity_key!",
gw.bonded as "bonded: bool",
gw.performance as "performance!",
gw.self_described as "self_described?",
gw.explorer_pretty_bond as "explorer_pretty_bond?",
gw.last_probe_result as "last_probe_result?",
gw.last_probe_log as "last_probe_log?",
gw.last_testrun_utc as "last_testrun_utc?",
gw.last_updated_utc as "last_updated_utc!",
gw.bridges as "bridges?: serde_json::Value",
COALESCE(gd.moniker, 'NA') as "moniker!",
COALESCE(gd.website, 'NA') as "website!",
COALESCE(gd.security_contact, 'NA') as "security_contact!",
COALESCE(gd.details, 'NA') as "details!"
FROM gateways gw
LEFT JOIN gateway_description gd
ON gw.gateway_identity_key = gd.gateway_identity_key
ORDER BY gw.gateway_identity_key"#,
)
.fetch(&self.pool)
.try_collect::<Vec<_>>()
.await?;
Ok(())
}
pub(crate) async fn get_all_gateways(pool: &DbPool) -> anyhow::Result<Vec<Gateway>> {
let mut conn = pool.acquire().await?;
let items = sqlx::query_as!(
GatewayDto,
r#"SELECT
gw.gateway_identity_key as "gateway_identity_key!",
gw.bonded as "bonded: bool",
gw.performance as "performance!",
gw.self_described as "self_described?",
gw.explorer_pretty_bond as "explorer_pretty_bond?",
gw.last_probe_result as "last_probe_result?",
gw.last_probe_log as "last_probe_log?",
gw.last_testrun_utc as "last_testrun_utc?",
gw.last_updated_utc as "last_updated_utc!",
gw.bridges as "bridges?: serde_json::Value",
COALESCE(gd.moniker, 'NA') as "moniker!",
COALESCE(gd.website, 'NA') as "website!",
COALESCE(gd.security_contact, 'NA') as "security_contact!",
COALESCE(gd.details, 'NA') as "details!"
FROM gateways gw
LEFT JOIN gateway_description gd
ON gw.gateway_identity_key = gd.gateway_identity_key
ORDER BY gw.gateway_identity_key"#,
)
.fetch(&mut *conn)
.try_collect::<Vec<_>>()
.await?;
let items: Vec<Gateway> = items
.into_iter()
.map(|item| item.try_into())
.collect::<anyhow::Result<Vec<_>>>()
.inspect_err(|e| error!("Conversion from DTO failed: {e}. Invalidly stored data?"))?;
tracing::trace!("Fetched {} gateways from DB", items.len());
Ok(items)
let items: Vec<Gateway> = items
.into_iter()
.map(|item| item.try_into())
.collect::<anyhow::Result<Vec<_>>>()
.inspect_err(|e| error!("Conversion from DTO failed: {e}. Invalidly stored data?"))?;
tracing::trace!("Fetched {} gateways from DB", items.len());
Ok(items)
}
}
pub(crate) async fn get_bonded_gateway_id_keys(pool: &DbPool) -> anyhow::Result<HashSet<String>> {
@@ -1,90 +1,92 @@
use time::UtcDateTime;
use crate::db::{DbPool, models::NetworkSummary};
use crate::db::{Storage, models::NetworkSummary};
/// take `last_updated` instead of calculating it so that `summary` matches
/// `daily_summary`
pub(crate) async fn insert_summaries(
pool: &DbPool,
summaries: &Vec<(&str, usize)>,
summary: &NetworkSummary,
last_updated: UtcDateTime,
) -> anyhow::Result<()> {
insert_summary(pool, summaries, last_updated).await?;
impl Storage {
/// take `last_updated` instead of calculating it so that `summary` matches
/// `daily_summary`
pub(crate) async fn insert_summaries(
&self,
summaries: &Vec<(&str, usize)>,
summary: &NetworkSummary,
last_updated: UtcDateTime,
) -> anyhow::Result<()> {
self.insert_summary(summaries, last_updated).await?;
insert_summary_history(pool, summary, last_updated).await?;
self.insert_summary_history(summary, last_updated).await?;
Ok(())
}
Ok(())
}
async fn insert_summary(
pool: &DbPool,
summaries: &Vec<(&str, usize)>,
last_updated: UtcDateTime,
) -> anyhow::Result<()> {
let timestamp = last_updated.unix_timestamp();
let mut tx = pool.begin().await?;
async fn insert_summary(
&self,
summaries: &Vec<(&str, usize)>,
last_updated: UtcDateTime,
) -> anyhow::Result<()> {
let timestamp = last_updated.unix_timestamp();
let mut tx = self.pool.begin().await?;
for (kind, value) in summaries {
let value = value.to_string();
sqlx::query!(
"INSERT INTO summary
for (kind, value) in summaries {
let value = value.to_string();
sqlx::query!(
"INSERT INTO summary
(key, value_json, last_updated_utc)
VALUES ($1, $2, $3)
ON CONFLICT(key) DO UPDATE SET
value_json=excluded.value_json,
last_updated_utc=excluded.last_updated_utc;",
kind,
value,
timestamp
)
.execute(&mut *tx)
.await
.map_err(|err| {
tracing::error!("Failed to insert data for {kind}: {err}, aborting transaction",);
err
})?;
kind,
value,
timestamp
)
.execute(&mut *tx)
.await
.map_err(|err| {
tracing::error!("Failed to insert data for {kind}: {err}, aborting transaction",);
err
})?;
}
tx.commit().await?;
Ok(())
}
tx.commit().await?;
/// For `<date_N>`, `summary_history` is updated with fresh data on every
/// iteration.
///
/// After UTC midnight, summary is inserted for `<date_N+1>` and last entry for
/// `<date_N>` stays there forever.
///
/// This is not aggregate data, it's a set of latest data points
async fn insert_summary_history(
&self,
summary: &NetworkSummary,
last_updated: UtcDateTime,
) -> anyhow::Result<()> {
let mut conn = self.pool.acquire().await?;
Ok(())
}
let value_json = serde_json::to_string(&summary)?;
let timestamp = last_updated.unix_timestamp();
/// For `<date_N>`, `summary_history` is updated with fresh data on every
/// iteration.
///
/// After UTC midnight, summary is inserted for `<date_N+1>` and last entry for
/// `<date_N>` stays there forever.
///
/// This is not aggregate data, it's a set of latest data points
async fn insert_summary_history(
pool: &DbPool,
summary: &NetworkSummary,
last_updated: UtcDateTime,
) -> anyhow::Result<()> {
let mut conn = pool.acquire().await?;
let date = datetime_to_only_date_str(last_updated);
let value_json = serde_json::to_string(&summary)?;
let timestamp = last_updated.unix_timestamp();
let date = datetime_to_only_date_str(last_updated);
sqlx::query!(
"INSERT INTO summary_history
sqlx::query!(
"INSERT INTO summary_history
(date, timestamp_utc, value_json)
VALUES ($1, $2, $3)
ON CONFLICT(date) DO UPDATE SET
timestamp_utc=excluded.timestamp_utc,
value_json=excluded.value_json;",
date,
timestamp,
value_json
)
.execute(&mut *conn)
.await?;
date,
timestamp,
value_json
)
.execute(&mut *conn)
.await?;
Ok(())
Ok(())
}
}
/// YYYY-MM-DD, without time
@@ -2,6 +2,7 @@ pub(crate) mod ecash_data;
mod gateways;
mod gateways_stats;
mod misc;
mod node_families;
mod nym_nodes;
mod packet_stats;
pub(crate) mod scraper;
@@ -9,16 +10,10 @@ mod summary;
pub(crate) mod testruns;
pub(crate) use gateways::{
get_all_gateways, get_bonded_gateway_id_keys, get_or_create_gateway, select_gateway_identity,
update_bonded_gateways,
get_bonded_gateway_id_keys, get_or_create_gateway, select_gateway_identity,
};
pub(crate) use gateways_stats::{delete_old_records, get_sessions_stats, insert_session_records};
pub(crate) use misc::insert_summaries;
pub(crate) use nym_nodes::{
get_all_nym_nodes, get_bonded_node_description, get_daily_stats,
get_described_bonded_nym_nodes, get_described_node_bond_info, get_node_self_description,
update_nym_nodes,
};
pub(crate) use nym_nodes::get_daily_stats;
pub(crate) use packet_stats::{
batch_store_node_scraper_results, get_raw_node_stats, insert_daily_node_stats_uncommitted,
};
@@ -0,0 +1,135 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::db::Storage;
use crate::db::models::{NodeFamilyDto, NodeFamilyInsertRecord, NodeFamilyMemberDto};
use futures_util::TryStreamExt;
use tracing::instrument;
impl Storage {
/// Replace the node-families snapshot atomically. Wipes both
/// `node_families` and `node_family_members` (cascade) and re-inserts
/// the provided records inside a single transaction so reads never
/// observe a partial state.
///
/// Both inserts are batched via `UNNEST(..)`, so the whole refresh is a
/// constant number of round trips regardless of how many families/members
/// the snapshot contains.
#[instrument(level = "debug", skip_all, fields(family_records = family_records.len()))]
pub(crate) async fn update_node_families(
&self,
family_records: Vec<NodeFamilyInsertRecord>,
) -> anyhow::Result<usize> {
let inserted = family_records.len();
// Reshape the row-major records into column-major vectors so we can
// bind each column as a Postgres array and let `UNNEST` expand them
// back into rows.
let mut family_ids: Vec<i64> = Vec::with_capacity(inserted);
let mut names: Vec<String> = Vec::with_capacity(inserted);
let mut descriptions: Vec<String> = Vec::with_capacity(inserted);
let mut owners: Vec<String> = Vec::with_capacity(inserted);
let mut family_stakes: Vec<Option<i64>> = Vec::with_capacity(inserted);
let mut members_counts: Vec<i32> = Vec::with_capacity(inserted);
let mut created_ats: Vec<i64> = Vec::with_capacity(inserted);
let mut last_updated_utcs: Vec<i64> = Vec::with_capacity(inserted);
let total_members: usize = family_records.iter().map(|f| f.members.len()).sum();
let mut member_node_ids: Vec<i64> = Vec::with_capacity(total_members);
let mut member_family_ids: Vec<i64> = Vec::with_capacity(total_members);
let mut member_joined_ats: Vec<i64> = Vec::with_capacity(total_members);
for record in family_records {
let family_id = record.family_id;
family_ids.push(family_id);
names.push(record.name);
descriptions.push(record.description);
owners.push(record.owner);
family_stakes.push(record.family_stake_unym);
members_counts.push(record.members_count);
created_ats.push(record.created_at);
last_updated_utcs.push(record.last_updated_utc);
for member in record.members {
member_node_ids.push(member.node_id);
member_family_ids.push(family_id);
member_joined_ats.push(member.joined_at);
}
}
let mut tx = self.pool.begin().await?;
// ON DELETE CASCADE on the members table wipes both sides
sqlx::query!("DELETE FROM node_families")
.execute(&mut *tx)
.await?;
sqlx::query!(
"INSERT INTO node_families
(family_id, name, description, owner, family_stake_unym, members_count, created_at, last_updated_utc)
SELECT * FROM UNNEST(
$1::BIGINT[], $2::TEXT[], $3::TEXT[], $4::TEXT[],
$5::BIGINT[], $6::INTEGER[], $7::BIGINT[], $8::BIGINT[]
)",
&family_ids[..],
&names[..],
&descriptions[..],
&owners[..],
&family_stakes[..] as &[Option<i64>],
&members_counts[..],
&created_ats[..],
&last_updated_utcs[..],
)
.execute(&mut *tx)
.await?;
sqlx::query!(
"INSERT INTO node_family_members (node_id, family_id, joined_at)
SELECT * FROM UNNEST($1::BIGINT[], $2::BIGINT[], $3::BIGINT[])",
&member_node_ids[..],
&member_family_ids[..],
&member_joined_ats[..],
)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(inserted)
}
/// Read every cached family.
pub(crate) async fn get_all_node_families(&self) -> anyhow::Result<Vec<NodeFamilyDto>> {
sqlx::query_as!(
NodeFamilyDto,
r#"SELECT
family_id,
name,
description,
owner,
family_stake_unym,
members_count,
created_at
FROM node_families
ORDER BY family_id"#,
)
.fetch(&self.pool)
.try_collect::<Vec<_>>()
.await
.map_err(From::from)
}
/// Read every cached `(node_id, family_id)` membership pair.
pub(crate) async fn get_all_node_family_members(
&self,
) -> anyhow::Result<Vec<NodeFamilyMemberDto>> {
sqlx::query_as!(
NodeFamilyMemberDto,
r#"SELECT node_id, family_id
FROM node_family_members"#,
)
.fetch(&self.pool)
.try_collect::<Vec<_>>()
.await
.map_err(From::from)
}
}
@@ -1,4 +1,5 @@
use crate::db::DbConnection;
use crate::db::Storage;
use crate::db::models::NymNodeDescriptionDeHelper;
use crate::http::models::DailyStats;
use crate::{
@@ -13,100 +14,95 @@ use nym_node_requests::api::v1::node::models::NodeDescription;
use nym_validator_client::client::{NodeId, NymNodeDetails};
use nym_validator_client::models::NymNodeDescriptionV2;
use std::collections::HashMap;
use tracing::{instrument, warn};
use tracing::{error, instrument, warn};
pub(crate) async fn get_all_nym_nodes(pool: &DbPool) -> anyhow::Result<Vec<NymNodeDto>> {
let mut conn = pool.acquire().await?;
impl Storage {
pub(crate) async fn get_all_nym_nodes(&self) -> anyhow::Result<Vec<NymNodeDto>> {
sqlx::query_as!(
NymNodeDto,
r#"SELECT
node_id,
ed25519_identity_pubkey,
total_stake,
ip_addresses as "ip_addresses!: serde_json::Value",
mix_port,
x25519_sphinx_pubkey,
node_role as "node_role: serde_json::Value",
supported_roles as "supported_roles: serde_json::Value",
entry as "entry: serde_json::Value",
performance,
self_described as "self_described: serde_json::Value",
bond_info as "bond_info: serde_json::Value",
http_api_port
FROM
nym_nodes
ORDER BY
node_id
"#,
)
.fetch(&self.pool)
.try_collect::<Vec<NymNodeDto>>()
.await
.map_err(From::from)
}
sqlx::query_as!(
NymNodeDto,
r#"SELECT
node_id,
ed25519_identity_pubkey,
total_stake,
ip_addresses as "ip_addresses!: serde_json::Value",
mix_port,
x25519_sphinx_pubkey,
node_role as "node_role: serde_json::Value",
supported_roles as "supported_roles: serde_json::Value",
entry as "entry: serde_json::Value",
performance,
self_described as "self_described: serde_json::Value",
bond_info as "bond_info: serde_json::Value",
http_api_port
FROM
nym_nodes
ORDER BY
node_id
"#,
)
.fetch(&mut *conn)
.try_collect::<Vec<NymNodeDto>>()
.await
.map_err(From::from)
}
/// if a node doesn't expose its self-described endpoint, it can't route traffic
/// - https://nym.com/docs/operators/nodes/nym-node/bonding
///
/// same if it's not bonded in the mixnet smart contract
/// - https://nym.com/docs/operators/tokenomics/mixnet-rewards#rewarded-set-selection
pub(crate) async fn get_described_bonded_nym_nodes(&self) -> anyhow::Result<Vec<NymNodeDto>> {
sqlx::query_as!(
NymNodeDto,
r#"SELECT
node_id,
ed25519_identity_pubkey,
total_stake,
ip_addresses as "ip_addresses!: serde_json::Value",
mix_port,
x25519_sphinx_pubkey,
node_role as "node_role: serde_json::Value",
supported_roles as "supported_roles: serde_json::Value",
entry as "entry: serde_json::Value",
performance,
self_described as "self_described: serde_json::Value",
bond_info as "bond_info: serde_json::Value",
http_api_port
FROM
nym_nodes
WHERE
self_described IS NOT NULL
AND
bond_info IS NOT NULL
"#,
)
.fetch(&self.pool)
.try_collect::<Vec<NymNodeDto>>()
.await
.map_err(From::from)
}
/// if a node doesn't expose its self-described endpoint, it can't route traffic
/// - https://nym.com/docs/operators/nodes/nym-node/bonding
///
/// same if it's not bonded in the mixnet smart contract
/// - https://nym.com/docs/operators/tokenomics/mixnet-rewards#rewarded-set-selection
pub(crate) async fn get_described_bonded_nym_nodes(
pool: &DbPool,
) -> anyhow::Result<Vec<NymNodeDto>> {
let mut conn = pool.acquire().await?;
#[instrument(level = "debug", skip_all, fields(node_records=node_records.len()))]
pub(crate) async fn update_nym_nodes(
&self,
node_records: Vec<NymNodeInsertRecord>,
) -> anyhow::Result<usize> {
let mut tx = self.pool.begin().await?;
sqlx::query_as!(
NymNodeDto,
r#"SELECT
node_id,
ed25519_identity_pubkey,
total_stake,
ip_addresses as "ip_addresses!: serde_json::Value",
mix_port,
x25519_sphinx_pubkey,
node_role as "node_role: serde_json::Value",
supported_roles as "supported_roles: serde_json::Value",
entry as "entry: serde_json::Value",
performance,
self_described as "self_described: serde_json::Value",
bond_info as "bond_info: serde_json::Value",
http_api_port
FROM
nym_nodes
WHERE
self_described IS NOT NULL
AND
bond_info IS NOT NULL
"#,
)
.fetch(&mut *conn)
.try_collect::<Vec<NymNodeDto>>()
.await
.map_err(From::from)
}
#[instrument(level = "debug", skip_all, fields(node_records=node_records.len()))]
pub(crate) async fn update_nym_nodes(
pool: &DbPool,
node_records: Vec<NymNodeInsertRecord>,
) -> anyhow::Result<usize> {
let mut tx = pool.begin().await?;
sqlx::query!(
"UPDATE nym_nodes
sqlx::query!(
"UPDATE nym_nodes
SET
self_described = NULL,
bond_info = NULL",
)
.execute(&mut *tx)
.await?;
)
.execute(&mut *tx)
.await?;
let inserted = node_records.len();
for record in node_records {
// https://www.sqlite.org/lang_upsert.html
sqlx::query!(
"INSERT INTO nym_nodes
let inserted = node_records.len();
for record in node_records {
// https://www.sqlite.org/lang_upsert.html
sqlx::query!(
"INSERT INTO nym_nodes
(node_id, ed25519_identity_pubkey,
total_stake,
ip_addresses, mix_port,
@@ -131,146 +127,148 @@ pub(crate) async fn update_nym_nodes(
last_updated_utc=excluded.last_updated_utc,
http_api_port=excluded.http_api_port
;",
record.node_id,
record.ed25519_identity_pubkey,
record.total_stake,
record.ip_addresses,
record.mix_port,
record.x25519_sphinx_pubkey,
record.node_role,
record.supported_roles,
record.entry,
record.self_described,
record.bond_info,
record.performance,
record.last_updated_utc as i32,
record.http_api_port,
)
.execute(&mut *tx)
.await
.map_err(|e| anyhow::anyhow!("Failed to INSERT node_id={}: {}", record.node_id, e))?;
record.node_id,
record.ed25519_identity_pubkey,
record.total_stake,
record.ip_addresses,
record.mix_port,
record.x25519_sphinx_pubkey,
record.node_role,
record.supported_roles,
record.entry,
record.self_described,
record.bond_info,
record.performance,
record.last_updated_utc as i32,
record.http_api_port,
)
.execute(&mut *tx)
.await
.map_err(|e| anyhow::anyhow!("Failed to INSERT node_id={}: {}", record.node_id, e))?;
}
tx.commit().await?;
Ok(inserted)
}
tx.commit().await?;
pub(crate) async fn get_described_node_bond_info(
&self,
) -> anyhow::Result<HashMap<NodeId, NymNodeDetails>> {
sqlx::query!(
r#"SELECT
node_id,
bond_info as "bond_info: serde_json::Value"
FROM
nym_nodes
WHERE
bond_info IS NOT NULL
AND
self_described IS NOT NULL
"#,
)
.fetch_all(&self.pool)
.await
.map(|records| {
records
.into_iter()
.filter_map(|record| {
let node_id = record.node_id;
record
.bond_info
// only return details for nodes which have details stored
.and_then(|bond_info| {
serde_json::from_value::<NymNodeDetails>(bond_info)
.inspect_err(|err| {
error!("malformed bond_info for node {node_id}: {err}")
})
.ok()
})
.map(|res| (node_id as NodeId, res))
})
.collect::<HashMap<_, _>>()
})
.map_err(From::from)
}
Ok(inserted)
}
pub(crate) async fn get_node_self_description(
&self,
) -> anyhow::Result<HashMap<NodeId, NymNodeDescriptionV2>> {
sqlx::query!(
r#"SELECT
node_id,
self_described as "self_described: serde_json::Value"
FROM
nym_nodes
WHERE
self_described IS NOT NULL
ORDER BY
node_id
"#,
)
.fetch_all(&self.pool)
.await
.map(|records| {
records
.into_iter()
.filter_map(|record| {
let node_id = record.node_id;
record
.self_described
// only return details for nodes which have details stored
.and_then(|description| {
serde_json::from_value::<NymNodeDescriptionDeHelper>(description)
.inspect_err(|err| {
warn!("malformed description data for node {node_id}: {err}")
})
.ok()
})
.map(|res| (record.node_id as NodeId, res.into()))
})
.collect::<HashMap<_, _>>()
})
.map_err(From::from)
}
pub(crate) async fn get_described_node_bond_info(
pool: &DbPool,
) -> anyhow::Result<HashMap<NodeId, NymNodeDetails>> {
let mut conn = pool.acquire().await?;
sqlx::query!(
r#"SELECT
node_id,
bond_info as "bond_info: serde_json::Value"
FROM
nym_nodes
WHERE
bond_info IS NOT NULL
AND
self_described IS NOT NULL
"#,
)
.fetch_all(&mut *conn)
.await
.map(|records| {
records
.into_iter()
.filter_map(|record| {
record
.bond_info
// only return details for nodes which have details stored
.and_then(|bond_info| serde_json::from_value::<NymNodeDetails>(bond_info).ok())
.map(|res| (record.node_id as NodeId, res))
})
.collect::<HashMap<_, _>>()
})
.map_err(From::from)
}
pub(crate) async fn get_node_self_description(
pool: &DbPool,
) -> anyhow::Result<HashMap<NodeId, NymNodeDescriptionV2>> {
let mut conn = pool.acquire().await?;
sqlx::query!(
r#"SELECT
node_id,
self_described as "self_described: serde_json::Value"
FROM
nym_nodes
WHERE
self_described IS NOT NULL
ORDER BY
node_id
"#,
)
.fetch_all(&mut *conn)
.await
.map(|records| {
records
.into_iter()
.filter_map(|record| {
let node_id = record.node_id;
record
.self_described
// only return details for nodes which have details stored
.and_then(|description| {
serde_json::from_value::<NymNodeDescriptionDeHelper>(description)
.inspect_err(|err| {
warn!("malformed description data for node {node_id}: {err}")
})
.ok()
})
.map(|res| (record.node_id as NodeId, res.into()))
})
.collect::<HashMap<_, _>>()
})
.map_err(From::from)
}
pub(crate) async fn get_bonded_node_description(
pool: &DbPool,
) -> anyhow::Result<HashMap<NodeId, NodeDescription>> {
let mut conn = pool.acquire().await?;
sqlx::query!(
r#"SELECT
nd.node_id,
moniker,
website,
security_contact,
details
FROM
nym_node_descriptions nd
INNER JOIN
nym_nodes nn on nd.node_id = nn.node_id
WHERE
bond_info IS NOT NULL
"#,
)
.fetch_all(&mut *conn)
.await
.map(|records| {
records
.into_iter()
.map(|elem| {
let node_id: NodeId = elem.node_id.try_into().unwrap_or_default();
(
node_id,
NodeDescription {
moniker: elem.moniker.unwrap_or_default(),
website: elem.website.unwrap_or_default(),
security_contact: elem.security_contact.unwrap_or_default(),
details: elem.details.unwrap_or_default(),
},
)
})
.collect::<HashMap<NodeId, NodeDescription>>()
})
.map_err(From::from)
pub(crate) async fn get_bonded_node_description(
&self,
) -> anyhow::Result<HashMap<NodeId, NodeDescription>> {
sqlx::query!(
r#"SELECT
nd.node_id,
moniker,
website,
security_contact,
details
FROM
nym_node_descriptions nd
INNER JOIN
nym_nodes nn on nd.node_id = nn.node_id
WHERE
bond_info IS NOT NULL
"#,
)
.fetch_all(&self.pool)
.await
.map(|records| {
records
.into_iter()
.map(|elem| {
let node_id: NodeId = elem.node_id.try_into().unwrap_or_default();
(
node_id,
NodeDescription {
moniker: elem.moniker.unwrap_or_default(),
website: elem.website.unwrap_or_default(),
security_contact: elem.security_contact.unwrap_or_default(),
details: elem.details.unwrap_or_default(),
},
)
})
.collect::<HashMap<NodeId, NodeDescription>>()
})
.map_err(From::from)
}
}
pub(crate) async fn insert_nym_node_description(
@@ -18,7 +18,8 @@ pub(crate) async fn get_nodes_for_scraping(pool: &DbPool) -> Result<Vec<ScraperN
let gateway_keys = queries::get_bonded_gateway_id_keys(pool).await?;
let mut entry_exit_nodes = 0;
let skimmed_nodes = queries::get_described_bonded_nym_nodes(pool)
let skimmed_nodes = crate::db::Storage::from_pool(pool.clone())
.get_described_bonded_nym_nodes()
.await
.map(|nodes_dto| {
nodes_dto.into_iter().filter_map(|node_dto| {
@@ -51,7 +51,7 @@ pub async fn get_gateways_by_country(
Ok(Json(
state
.cache()
.get_dvpn_gateway_list(state.db_pool(), &MIN_SUPPORTED_VERSION)
.get_dvpn_gateway_list(state.storage(), &MIN_SUPPORTED_VERSION)
.await
.into_iter()
.filter(|gw| gw.location.two_letter_iso_country_code.to_uppercase() == country.alpha2)
@@ -75,7 +75,7 @@ pub async fn get_gateway_countries(state: State<AppState>) -> HttpResult<Json<Ve
Ok(Json(
state
.cache()
.get_dvpn_gateway_list(state.db_pool(), &MIN_SUPPORTED_VERSION)
.get_dvpn_gateway_list(state.storage(), &MIN_SUPPORTED_VERSION)
.await
.into_iter()
.map(|gw| gw.location.two_letter_iso_country_code.to_string())
@@ -39,7 +39,7 @@ pub async fn get_entry_gateways(state: State<AppState>) -> HttpResult<Json<Vec<D
Ok(Json(
state
.cache()
.get_entry_dvpn_gateways(state.db_pool(), &MIN_SUPPORTED_VERSION)
.get_entry_dvpn_gateways(state.storage(), &MIN_SUPPORTED_VERSION)
.await,
))
}
@@ -60,7 +60,7 @@ pub async fn get_entry_gateway_countries(state: State<AppState>) -> HttpResult<J
Ok(Json(
state
.cache()
.get_entry_dvpn_gateways(state.db_pool(), &MIN_SUPPORTED_VERSION)
.get_entry_dvpn_gateways(state.storage(), &MIN_SUPPORTED_VERSION)
.await
.into_iter()
.map(|gw| gw.location.two_letter_iso_country_code.to_string())
@@ -92,7 +92,7 @@ pub async fn get_entry_gateways_by_country(
Ok(Json(
state
.cache()
.get_entry_dvpn_gateways(state.db_pool(), &MIN_SUPPORTED_VERSION)
.get_entry_dvpn_gateways(state.storage(), &MIN_SUPPORTED_VERSION)
.await
.into_iter()
.filter(|gw| gw.location.two_letter_iso_country_code.to_uppercase() == country.alpha2)
@@ -39,7 +39,7 @@ pub async fn get_exit_gateways(state: State<AppState>) -> HttpResult<Json<Vec<DV
Ok(Json(
state
.cache()
.get_exit_dvpn_gateways(state.db_pool(), &MIN_SUPPORTED_VERSION)
.get_exit_dvpn_gateways(state.storage(), &MIN_SUPPORTED_VERSION)
.await,
))
}
@@ -60,7 +60,7 @@ pub async fn get_entry_gateway_countries(state: State<AppState>) -> HttpResult<J
Ok(Json(
state
.cache()
.get_exit_dvpn_gateways(state.db_pool(), &MIN_SUPPORTED_VERSION)
.get_exit_dvpn_gateways(state.storage(), &MIN_SUPPORTED_VERSION)
.await
.into_iter()
.map(|gw| gw.location.two_letter_iso_country_code.to_string())
@@ -92,7 +92,7 @@ pub async fn get_exit_gateways_by_country(
Ok(Json(
state
.cache()
.get_exit_dvpn_gateways(state.db_pool(), &MIN_SUPPORTED_VERSION)
.get_exit_dvpn_gateways(state.storage(), &MIN_SUPPORTED_VERSION)
.await
.into_iter()
.filter(|gw| gw.location.two_letter_iso_country_code.to_uppercase() == country.alpha2)
@@ -59,20 +59,22 @@ pub async fn dvpn_gateways(
None => MIN_SUPPORTED_VERSION.clone(),
};
let storage = state.storage();
Ok(Json(
state
.cache()
.get_dvpn_gateway_list(state.db_pool(), &min_node_version)
.get_dvpn_gateway_list(storage, &min_node_version)
.await,
))
}
#[instrument(level = tracing::Level::INFO, skip(state))]
pub async fn dvpn_gateway_ips(state: State<AppState>) -> HttpResult<Json<Vec<String>>> {
let storage = state.storage();
Ok(Json(
state
.cache()
.get_gateway_ips(state.db_pool(), &MIN_SUPPORTED_VERSION)
.get_gateway_ips(storage, &MIN_SUPPORTED_VERSION)
.await,
))
}
@@ -34,8 +34,8 @@ async fn gateways(
Query(pagination): Query<Pagination>,
State(state): State<AppState>,
) -> HttpResult<Json<PagedResult<Gateway>>> {
let db = state.db_pool();
let res = state.cache().get_gateway_list(db).await;
let storage = state.storage();
let res = state.cache().get_gateway_list(storage).await;
Ok(Json(PagedResult::paginate(pagination, res)))
}
@@ -55,8 +55,8 @@ async fn gateways_skinny(
Query(pagination): Query<Pagination>,
State(state): State<AppState>,
) -> HttpResult<Json<PagedResult<GatewaySkinny>>> {
let db = state.db_pool();
let res = state.cache().get_gateway_list(db).await;
let storage = state.storage();
let res = state.cache().get_gateway_list(storage).await;
let res: Vec<GatewaySkinny> = filter_bonded_gateways_to_skinny(res);
Ok(Json(PagedResult::paginate(pagination, res)))
@@ -83,8 +83,8 @@ async fn get_gateway(
Path(IdentityKeyParam { identity_key }): Path<IdentityKeyParam>,
State(state): State<AppState>,
) -> HttpResult<Json<Gateway>> {
let db = state.db_pool();
let res = state.cache().get_gateway_list(db).await;
let storage = state.storage();
let res = state.cache().get_gateway_list(storage).await;
match res
.iter()
@@ -40,12 +40,12 @@ async fn nym_nodes(
Query(pagination): Query<Pagination>,
State(state): State<AppState>,
) -> HttpResult<Json<PagedResult<ExtendedNymNode>>> {
let db = state.db_pool();
let storage = state.storage();
let node_geocache = state.node_geocache();
let nodes = state
.cache()
.get_nym_nodes_list(db, node_geocache)
.get_nym_nodes_list(storage, node_geocache)
.await
.map_err(|e| {
tracing::error!("{e}");
@@ -43,14 +43,14 @@ async fn mixnodes(
Query(params): Query<ServicesQueryParams>,
State(state): State<AppState>,
) -> HttpResult<Json<PagedResult<Service>>> {
let db = state.db_pool();
let storage = state.storage();
let cache = state.cache();
let paths = ParseJsonPaths::new().map_err(|e| {
tracing::error!("Invalidly configured ParseJsonPaths: {e}");
HttpError::internal()
})?;
let res = cache.get_gateway_list(db).await;
let res = cache.get_gateway_list(storage).await;
let services = gateway_list_to_services(&paths, res, params.clone());
Ok(Json(PagedResult::paginate(
@@ -9,7 +9,7 @@ use crate::{
monitor::ExplorerPrettyBond,
};
use cosmwasm_std::{Addr, Coin, Decimal};
use nym_mixnet_contract_common::CoinSchema;
use nym_mixnet_contract_common::{CoinSchema, NodeRewarding};
use nym_node_requests::api::v1::node::models::NodeDescription;
use nym_validator_client::{
client::NodeId,
@@ -29,6 +29,7 @@ use utoipa::ToSchema;
use crate::db::models::NymNodeDataDeHelper;
use crate::node_scraper::models::BridgeInformation;
use crate::monitor::geodata;
pub(crate) use nym_node_status_client::models::TestrunAssignment;
pub(crate) mod gw_probe;
@@ -50,6 +51,128 @@ pub struct Gateway {
pub bridges: Option<serde_json::Value>,
}
impl Gateway {
fn geo_location(&self) -> anyhow::Result<geodata::Location> {
self.explorer_pretty_bond
.clone()
.ok_or_else(|| anyhow::anyhow!("Missing explorer_pretty_bond"))
.and_then(|value| {
serde_json::from_value::<ExplorerPrettyBond>(value).map_err(From::from)
})
.map(|bond| bond.location)
}
pub(crate) fn location(&self) -> anyhow::Result<Location> {
let geolocation = self.geo_location()?;
Ok(Location {
latitude: geolocation.location.latitude,
longitude: geolocation.location.longitude,
two_letter_iso_country_code: geolocation.two_letter_iso_country_code,
org: geolocation.org,
city: geolocation.city,
region: geolocation.region,
postal: geolocation.postal,
timezone: geolocation.timezone,
asn: geolocation.asn.map(|a| {
let kind = if a.kind.eq_ignore_ascii_case("isp") {
// we consider anything that is "ISP" from ipinfo to be residential
AsnKind::Residential
} else {
// everything else is considered "other"
AsnKind::Other
};
Asn {
asn: a.asn,
domain: a.domain,
kind,
name: a.name,
route: a.route,
}
}),
})
}
pub(crate) fn self_described(&self) -> anyhow::Result<NymNodeDataDeHelper> {
self.self_described
.clone()
.ok_or_else(|| anyhow::anyhow!("Missing self_described"))
.and_then(|value| {
serde_json::from_value::<NymNodeDataDeHelper>(value).map_err(From::from)
})
}
pub(crate) fn bridges(&self) -> Option<BridgeInformation> {
self.bridges.clone().and_then(|v| {
serde_json::from_value::<BridgeInformation>(v)
.inspect_err(|err| {
error!(
"Failed to deserialize bridges for gateway identity {}: {err}",
self.gateway_identity_key
);
})
.ok()
})
}
fn last_probe_result(&self) -> anyhow::Result<Option<LastProbeResult>> {
let Some(last_probe) = &self.last_probe_result else {
return Ok(None);
};
let probe =
LastProbeResult::deserialize_with_fallback(last_probe.clone()).inspect_err(|err| {
error!("Failed to deserialize probe result: {err}");
})?;
tracing::trace!("🌈 gateway probe parsed: {probe:?}");
Ok(Some(probe))
}
pub(crate) fn last_dvpn_probe_result(
&self,
socks5_score: Option<&ScoreValue>,
) -> anyhow::Result<Option<DvpnProbeOutcome>> {
let Some(last_probe) = self.last_probe_result()? else {
return Ok(None);
};
let socks5_score = socks5_score.unwrap_or(&ScoreValue::Offline).to_owned();
let dvpn_probe_result =
DvpnProbeOutcome::from_raw_probe_outcome(last_probe.outcome(), socks5_score);
Ok(Some(dvpn_probe_result))
}
pub(crate) fn performance_v2(&self) -> anyhow::Result<Option<DVpnGatewayPerformance>> {
let Some(last_probe) = self.last_probe_result()? else {
return Ok(None);
};
let last_updated_utc = self.last_testrun_utc.clone().unwrap_or_default();
let network_monitor_performance_mixnet_mode = self.performance as f32 / 100f32;
let mixnet_score = calculate_mixnet_score(self);
let score = calc_gateway_visual_score(self, &last_probe);
let mut load = calculate_load(&last_probe);
// clamp the load value to offline, when the score is offline
if score == ScoreValue::Offline {
load = ScoreValue::Offline;
}
let performance_v2 = DVpnGatewayPerformance {
last_updated_utc: last_updated_utc.to_string(),
load,
score,
mixnet_score,
// the network monitor's measure is a good proxy for node uptime, it can be improved in the future
uptime_percentage_last_24_hours: network_monitor_performance_mixnet_mode,
};
Ok(Some(performance_v2))
}
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct BuildInformation {
pub build_version: String,
@@ -157,6 +280,66 @@ impl From<&LewesProtocolDetailsDataV1Validator> for LewesProtocolDetailsDataV1 {
}
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct NodeFamilyInformation {
// family id
id: u32,
// family name
name: String,
// description
description: String,
// amount in unym
family_stake: u128,
// number of members
members: usize,
}
impl NodeFamilyInformation {
pub(crate) fn new(
id: u32,
name: String,
description: String,
family_stake: u128,
members: usize,
) -> Self {
Self {
id,
name,
description,
family_stake,
members,
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct NodeStakeInformation {
// delegations + bond
total_stake: u128,
total_delegations: u128,
total_bond: u128,
// number of delegations
delegations: usize,
}
impl From<&NodeRewarding> for NodeStakeInformation {
fn from(rewarding: &NodeRewarding) -> Self {
let denom = &rewarding.cost_params.interval_operating_cost.denom;
let total_bond = rewarding.operator_pledge_with_reward(denom).amount.u128();
let total_delegations = rewarding.delegations_with_reward(denom).amount.u128();
NodeStakeInformation {
total_stake: total_bond + total_delegations,
total_bond,
total_delegations,
delegations: rewarding.unique_delegations as usize,
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct DVpnGateway {
pub identity_key: String,
@@ -183,6 +366,9 @@ pub struct DVpnGateway {
pub lewes_protocol_details: Option<LewesProtocolDetailsV1>,
pub family_data: Option<NodeFamilyInformation>,
pub staking_data: Option<NodeStakeInformation>,
pub build_information: BinaryBuildInformationOwned,
}
@@ -208,73 +394,20 @@ impl DVpnGateway {
gateway: Gateway,
skimmed_node: &SkimmedNodeV1,
socks5_score: Option<&ScoreValue>,
family_details: Option<NodeFamilyInformation>,
staking_details: Option<NodeStakeInformation>,
) -> anyhow::Result<Self> {
let location = gateway
.explorer_pretty_bond
.clone()
.ok_or_else(|| anyhow::anyhow!("Missing explorer_pretty_bond"))
.and_then(|value| {
serde_json::from_value::<ExplorerPrettyBond>(value).map_err(From::from)
})
.map(|bond| bond.location)?;
let self_described: NymNodeDataDeHelper = gateway
.self_described
.clone()
.ok_or_else(|| anyhow::anyhow!("Missing self_described"))
.and_then(|value| {
serde_json::from_value::<NymNodeDataDeHelper>(value).map_err(From::from)
})?;
let location = gateway.location()?;
let self_described = gateway.self_described()?;
let last_updated_utc = gateway.last_testrun_utc.clone().unwrap_or_default();
let performance = to_percent(gateway.performance);
let network_monitor_performance_mixnet_mode = gateway.performance as f32 / 100f32;
let bridges = gateway.bridges.clone().and_then(|v| {
serde_json::from_value(v)
.inspect_err(|err| {
error!(
"Failed to deserialize bridges for gateway identity {}: {err}",
gateway.gateway_identity_key
);
})
.ok()
});
let bridges = gateway.bridges();
tracing::debug!("🌈 gateway probe result: {:?}", gateway.last_probe_result);
let (last_probe_result, performance_v2) = match gateway.last_probe_result {
Some(ref value) => {
let parsed = LastProbeResult::deserialize_with_fallback(value.clone())
.inspect_err(|err| {
error!("Failed to deserialize probe result: {err}");
})?;
tracing::trace!("🌈 gateway probe parsed: {:?}", parsed);
let mixnet_score = calculate_mixnet_score(&gateway);
let score = calc_gateway_visual_score(&gateway, &parsed);
let mut load = calculate_load(&parsed);
let socks5_score = socks5_score.unwrap_or(&ScoreValue::Offline).to_owned();
let dvpn_probe_result =
DvpnProbeOutcome::from_raw_probe_outcome(parsed.outcome(), socks5_score);
// clamp the load value to offline, when the score is offline
if score == ScoreValue::Offline {
load = ScoreValue::Offline;
}
let performance_v2 = DVpnGatewayPerformance {
last_updated_utc: last_updated_utc.to_string(),
load,
score,
mixnet_score,
// the network monitor's measure is a good proxy for node uptime, it can be improved in the future
uptime_percentage_last_24_hours: network_monitor_performance_mixnet_mode,
};
(Some(dvpn_probe_result), Some(performance_v2))
}
None => (None, None),
};
let last_probe_result = gateway.last_dvpn_probe_result(socks5_score)?;
let performance_v2 = gateway.performance_v2()?;
Ok(Self {
identity_key: gateway.gateway_identity_key,
@@ -282,32 +415,7 @@ impl DVpnGateway {
description: Some(gateway.description.details),
ip_packet_router: self_described.ip_packet_router,
authenticator: self_described.authenticator,
location: Location {
latitude: location.location.latitude,
longitude: location.location.longitude,
two_letter_iso_country_code: location.two_letter_iso_country_code,
org: location.org,
city: location.city,
region: location.region,
postal: location.postal,
timezone: location.timezone,
asn: location.asn.map(|a| {
let kind = if a.kind.eq_ignore_ascii_case("isp") {
// we consider anything that is "ISP" from ipinfo to be residential
AsnKind::Residential
} else {
// everything else is considered "other"
AsnKind::Other
};
Asn {
asn: a.asn,
domain: a.domain,
kind,
name: a.name,
route: a.route,
}
}),
},
location,
last_probe: last_probe_result
.map(|res| DvpnGwProbe::from_outcome(res, last_updated_utc)),
ip_addresses: skimmed_node.ip_addresses.clone(),
@@ -321,6 +429,8 @@ impl DVpnGateway {
.lewes_protocol
.as_ref()
.map(LewesProtocolDetailsV1::from),
family_data: family_details,
staking_data: staking_details,
build_information: self_described.build_information,
})
}
@@ -664,6 +774,7 @@ pub(crate) struct ExtendedNymNode {
pub(crate) rewarding_details: Option<nym_mixnet_contract_common::NodeRewarding>,
pub(crate) description: NodeDescription,
pub(crate) geoip: Option<NodeGeoData>,
pub family_data: Option<NodeFamilyInformation>,
}
#[derive(Clone, Debug, utoipa::ToSchema, Deserialize, Serialize)]
@@ -1,6 +1,6 @@
use crate::ticketbook_manager::state::TicketbookManagerState;
use crate::{
db::DbPool,
db,
http::{api::RouterBuilder, state::AppState},
monitor::{DelegationsCache, NodeGeoCache},
};
@@ -15,7 +15,7 @@ use tokio::{net::TcpListener, sync::RwLock};
/// background tokio task
#[allow(clippy::too_many_arguments)]
pub(crate) async fn start_http_api(
db_pool: DbPool,
storage: db::Storage,
http_port: u16,
nym_http_cache_ttl: u64,
agent_key_list: Vec<PublicKey>,
@@ -29,7 +29,7 @@ pub(crate) async fn start_http_api(
let router_builder = RouterBuilder::with_default_routes();
let state = AppState::new(
db_pool,
storage,
nym_http_cache_ttl,
agent_key_list,
agent_max_count,
@@ -15,9 +15,10 @@ use tokio::sync::RwLock;
use tracing::{error, instrument, trace, warn};
use utoipa::ToSchema;
use super::models::SessionStats;
use super::models::{NodeFamilyInformation, NodeStakeInformation, SessionStats};
use crate::{
db::{DbPool, queries},
db,
db::DbPool,
http::{
error::{HttpError, HttpResult},
models::{
@@ -30,10 +31,11 @@ use crate::{
use crate::ticketbook_manager::state::TicketbookManagerState;
pub(crate) use nym_validator_client::models::BinaryBuildInformationOwned;
use nym_validator_client::nyxd::contract_traits::node_families_query_client::NodeFamilyId;
#[derive(Clone)]
pub(crate) struct AppState {
db_pool: DbPool,
storage: db::Storage,
cache: HttpCache,
agent_key_list: Vec<PublicKey>,
agent_max_count: i64,
@@ -47,7 +49,7 @@ pub(crate) struct AppState {
impl AppState {
#[allow(clippy::too_many_arguments)]
pub(crate) async fn new(
db_pool: DbPool,
storage: db::Storage,
cache_ttl: u64,
agent_key_list: Vec<PublicKey>,
agent_max_count: i64,
@@ -57,7 +59,7 @@ impl AppState {
ticketbook_manager_state: TicketbookManagerState,
) -> Self {
Self {
db_pool,
storage,
cache: HttpCache::new(cache_ttl).await,
agent_key_list,
agent_max_count,
@@ -70,7 +72,11 @@ impl AppState {
}
pub(crate) fn db_pool(&self) -> &DbPool {
&self.db_pool
self.storage.pool()
}
pub(crate) fn storage(&self) -> &db::Storage {
&self.storage
}
pub(crate) fn cache(&self) -> &HttpCache {
@@ -225,7 +231,7 @@ impl HttpCache {
.await
}
pub async fn get_gateway_list(&self, db: &DbPool) -> Vec<Gateway> {
pub async fn get_gateway_list(&self, storage: &db::Storage) -> Vec<Gateway> {
match self.gateways.get(GATEWAYS_LIST_KEY).await {
Some(guard) => {
tracing::trace!("Fetching from cache...");
@@ -236,7 +242,7 @@ impl HttpCache {
// the key is missing so populate it
tracing::trace!("No gateways in cache, refreshing cache from DB...");
let gateways = match crate::db::queries::get_all_gateways(db).await {
let gateways = match storage.get_all_gateways().await {
Ok(gws) => {
tracing::info!("Successfully fetched {} gateways from database", gws.len());
if !gws.is_empty() {
@@ -282,148 +288,185 @@ impl HttpCache {
pub async fn get_dvpn_gateway_list(
&self,
db: &DbPool,
storage: &db::Storage,
min_node_version: &Version,
) -> Vec<DVpnGateway> {
match self.dvpn_gateways.get(DVPN_GATEWAYS_LIST_KEY).await {
let gateways = match self.dvpn_gateways.get(DVPN_GATEWAYS_LIST_KEY).await {
Some(guard) => {
tracing::trace!("Fetching from cache...");
let read_lock = guard.read().await;
read_lock.clone()
guard.read().await.clone()
}
None => {
tracing::info!("No gateways (dVPN) in cache, refreshing from DB...");
let gateways = self.get_gateway_list(db).await;
tracing::info!("Found {} gateways in database", gateways.len());
let started_with = gateways.len();
let skimmed_nodes = match crate::db::queries::get_described_bonded_nym_nodes(db)
.await
{
Ok(records) => {
let mut nodes = HashMap::new();
for dto in records {
match SkimmedNodeV1::try_from(dto) {
Ok(skimmed_node) => {
let key =
skimmed_node.ed25519_identity_pubkey.to_base58_string();
nodes.insert(key, skimmed_node);
}
Err(err) => {
error!(
"CRITICAL: Failed to convert NymNodeDto to SkimmedNode: {err}"
);
panic!(
"Cannot convert database record to SkimmedNode - this should never happen! Error: {err}"
);
}
}
}
nodes
}
Err(err) => {
error!("CRITICAL: Failed to query nym_nodes from database: {err}");
panic!(
"Cannot read nym_nodes table - database connection issue? Error: {err}"
);
}
};
let socks5_scores = calculate_socks5_percentiles(&gateways);
let res_gws = gateways
.iter()
.filter(|gw| gw.bonded)
.filter_map(|gw| match skimmed_nodes.get(&gw.gateway_identity_key) {
Some(skimmed_node) => Some((gw, skimmed_node)),
None => {
error!(
"CRITICAL: Gateway {} exists in gateways table but not in nym_nodes table! This should not happen.",
gw.gateway_identity_key
);
None
}
})
.filter_map(
|(gw, skimmed_node)| match DVpnGateway::new(gw.clone(), skimmed_node, socks5_scores.get(&gw.gateway_identity_key)) {
Ok(gw) => Some(gw),
Err(err) => {
error!(
"CRITICAL: Failed to create DVpnGateway for node_id={}, identity_key={}: {}",
skimmed_node.node_id,
skimmed_node.ed25519_identity_pubkey.to_base58_string(),
err
);
// Don't panic here as this might be due to missing fields, but log it loudly
None
}
},
)
.filter(|gw| {
let gw_version = &gw.build_information.build_version;
if let Ok(gw_version) = Version::parse(gw_version) {
&gw_version >= min_node_version
} else {
warn!("Failed to parse GW version {}", gw_version);
false
}
})
.filter(|gw| {
// gateways must have a country
if gw.location.two_letter_iso_country_code.len() == 2 {
true
} else {
warn!(
"Invalid country code: {}",
gw.location.two_letter_iso_country_code
);
false
}
})
// sort by country, then by identity key
.sorted_by_key(|item| {
(
item.location.two_letter_iso_country_code.clone(),
item.identity_key.clone(),
)
})
.collect::<Vec<_>>();
let bonded_count = gateways.iter().filter(|gw| gw.bonded).count();
tracing::info!(
"DVpn gateway filtering: {} total gateways, {} bonded, {} nym_nodes, {} final DVpn gateways",
started_with,
bonded_count,
skimmed_nodes.len(),
res_gws.len()
);
if res_gws.is_empty() && started_with > 0 {
tracing::error!(
"CRITICAL: Started with {} gateways but got 0 DVpn gateways! Min version: {}",
started_with,
min_node_version
);
} else {
tracing::info!(
"Successfully loaded {} DVpn gateways into cache",
res_gws.len()
);
self.upsert_dvpn_gateway_list(res_gws.clone()).await;
let built = self.build_dvpn_gateway_list(storage).await;
if !built.is_empty() {
self.upsert_dvpn_gateway_list(built.clone()).await;
}
built
}
};
res_gws
// version filter is applied at read time, not at cache-fill time, so
// requests with different `min_node_version` values do not poison each
// other's results.
gateways
.into_iter()
.filter(|gw| {
let gw_version = &gw.build_information.build_version;
match Version::parse(gw_version) {
Ok(parsed) => &parsed >= min_node_version,
Err(_) => {
warn!("Failed to parse GW version {gw_version}");
false
}
}
})
.collect()
}
/// Rebuild the dVPN gateway list from DB. Does **not** apply any version
/// filter — that's done at read time.
async fn build_dvpn_gateway_list(&self, storage: &db::Storage) -> Vec<DVpnGateway> {
let gateways = self.get_gateway_list(storage).await;
tracing::info!("Found {} gateways in database", gateways.len());
let started_with = gateways.len();
let records = match storage.get_described_bonded_nym_nodes().await {
Ok(records) => records,
Err(err) => {
error!("CRITICAL: Failed to query nym_nodes from database: {err}");
panic!("Cannot read nym_nodes table - database connection issue? Error: {err}");
}
};
let mut skimmed_nodes = HashMap::new();
for dto in records {
match SkimmedNodeV1::try_from(dto) {
Ok(skimmed_node) => {
let key = skimmed_node.ed25519_identity_pubkey.to_base58_string();
skimmed_nodes.insert(key, skimmed_node);
}
Err(err) => {
error!("CRITICAL: Failed to convert NymNodeDto to SkimmedNode: {err}");
panic!(
"Cannot convert database record to SkimmedNode - this should never happen! Error: {err}"
);
}
}
}
let family_lookup = match load_family_lookup(storage).await {
Ok(lookup) => lookup,
Err(err) => {
error!("CRITICAL: Failed to load node-families lookup from database: {err}");
panic!(
"Cannot read node_families tables - database connection issue? Error: {err}"
);
}
};
let bond_info = match storage.get_described_node_bond_info().await {
Ok(map) => map,
Err(err) => {
error!("CRITICAL: Failed to load node bond info from database: {err}");
panic!(
"Cannot read nym_nodes.bond_info column - database connection issue? Error: {err}"
);
}
};
let socks5_scores = calculate_socks5_percentiles(&gateways);
let res_gws = gateways
.iter()
.filter(|gw| gw.bonded)
.filter_map(|gw| match skimmed_nodes.get(&gw.gateway_identity_key) {
Some(skimmed_node) => Some((gw, skimmed_node)),
None => {
error!(
"CRITICAL: Gateway {} exists in gateways table but not in nym_nodes table! This should not happen.",
gw.gateway_identity_key
);
None
}
})
.filter_map(|(gw, skimmed_node)| {
let family = family_lookup.family_for_node(skimmed_node.node_id).cloned();
let staking = bond_info
.get(&skimmed_node.node_id)
.map(|details| NodeStakeInformation::from(&details.rewarding_details));
match DVpnGateway::new(
gw.clone(),
skimmed_node,
socks5_scores.get(&gw.gateway_identity_key),
family,
staking,
) {
Ok(gw) => Some(gw),
Err(err) => {
error!(
"CRITICAL: Failed to create DVpnGateway for node_id={}, identity_key={}: {}",
skimmed_node.node_id,
skimmed_node.ed25519_identity_pubkey.to_base58_string(),
err
);
// Don't panic here as this might be due to missing fields, but log it loudly
None
}
}
})
.filter(|gw| {
// gateways must have a country
if gw.location.two_letter_iso_country_code.len() == 2 {
true
} else {
warn!(
"Invalid country code: {}",
gw.location.two_letter_iso_country_code
);
false
}
})
// sort by country, then by identity key
.sorted_by_key(|item| {
(
item.location.two_letter_iso_country_code.clone(),
item.identity_key.clone(),
)
})
.collect::<Vec<_>>();
let bonded_count = gateways.iter().filter(|gw| gw.bonded).count();
tracing::info!(
"DVpn gateway filtering: {} total gateways, {} bonded, {} nym_nodes, {} final DVpn gateways",
started_with,
bonded_count,
skimmed_nodes.len(),
res_gws.len()
);
if res_gws.is_empty() && started_with > 0 {
tracing::error!(
"CRITICAL: Started with {} gateways but got 0 DVpn gateways!",
started_with
);
} else {
tracing::info!(
"Successfully loaded {} DVpn gateways into cache",
res_gws.len()
);
}
res_gws
}
pub async fn get_entry_dvpn_gateways(
&self,
db: &DbPool,
storage: &db::Storage,
min_node_version: &Version,
) -> Vec<DVpnGateway> {
self.get_dvpn_gateway_list(db, min_node_version)
self.get_dvpn_gateway_list(storage, min_node_version)
.await
.into_iter()
.filter(DVpnGateway::can_route_entry)
@@ -432,17 +475,21 @@ impl HttpCache {
pub async fn get_exit_dvpn_gateways(
&self,
db: &DbPool,
storage: &db::Storage,
min_node_version: &Version,
) -> Vec<DVpnGateway> {
self.get_dvpn_gateway_list(db, min_node_version)
self.get_dvpn_gateway_list(storage, min_node_version)
.await
.into_iter()
.filter(DVpnGateway::can_route_exit)
.collect()
}
pub async fn get_gateway_ips(&self, db: &DbPool, min_node_version: &Version) -> Vec<String> {
pub async fn get_gateway_ips(
&self,
storage: &db::Storage,
min_node_version: &Version,
) -> Vec<String> {
match self.gateway_ips.get(DVPN_GATEWAY_IPS).await {
Some(guard) => {
let read_lock = guard.read().await;
@@ -452,7 +499,7 @@ impl HttpCache {
trace!("No exit gateway IPs in cache, refreshing...");
let ips: Vec<String> = self
.get_dvpn_gateway_list(db, min_node_version)
.get_dvpn_gateway_list(storage, min_node_version)
.await
.into_iter()
.flat_map(|gw| gw.ip_addresses)
@@ -508,7 +555,7 @@ impl HttpCache {
pub async fn get_nym_nodes_list(
&self,
db: &DbPool,
storage: &db::Storage,
node_geocache: NodeGeoCache,
) -> anyhow::Result<Vec<ExtendedNymNode>> {
match self.nym_nodes.get(NYM_NODES_LIST_KEY).await {
@@ -520,7 +567,7 @@ impl HttpCache {
None => {
tracing::trace!("No nym nodes in cache, refreshing cache from DB...");
let nym_nodes = aggregate_node_info_from_db(db, node_geocache).await?;
let nym_nodes = aggregate_node_info_from_db(storage, node_geocache).await?;
if nym_nodes.is_empty() {
tracing::warn!("Database contains 0 nym nodes");
@@ -650,13 +697,13 @@ impl HttpCache {
#[instrument(level = "info", skip_all)]
async fn aggregate_node_info_from_db(
pool: &DbPool,
storage: &db::Storage,
node_geocache: NodeGeoCache,
) -> anyhow::Result<Vec<ExtendedNymNode>> {
let node_bond_info = queries::get_described_node_bond_info(pool).await?;
let node_bond_info = storage.get_described_node_bond_info().await?;
tracing::debug!("Described nodes with bond info: {}", node_bond_info.len());
let skimmed_nodes = queries::get_all_nym_nodes(pool).await.map(|records| {
let skimmed_nodes = storage.get_all_nym_nodes().await.map(|records| {
records
.into_iter()
.filter_map(|dto| SkimmedNodeV1::try_from(dto).ok())
@@ -665,10 +712,12 @@ async fn aggregate_node_info_from_db(
})?;
tracing::debug!("Skimmed nodes: {}", skimmed_nodes.len());
let described_nodes = queries::get_node_self_description(pool).await?;
let described_nodes = storage.get_node_self_description().await?;
tracing::debug!("Described nodes: {}", described_nodes.len());
let node_descriptions = queries::get_bonded_node_description(pool).await?;
let node_descriptions = storage.get_bonded_node_description().await?;
let families = load_family_lookup(storage).await?;
let mut parsed_nym_nodes = Vec::new();
for (node_id, described_node) in described_nodes {
@@ -722,6 +771,8 @@ async fn aggregate_node_info_from_db(
})
};
let family_data = families.family_for_node(node_id).cloned();
parsed_nym_nodes.push(ExtendedNymNode {
node_id,
identity_key,
@@ -737,6 +788,7 @@ async fn aggregate_node_info_from_db(
rewarding_details: rewarding_details.to_owned(),
description: node_description,
geoip,
family_data,
});
}
@@ -762,3 +814,54 @@ impl BinaryInfo {
pub(crate) struct HealthInfo {
pub(crate) uptime: i64,
}
struct LoadedNodeFamilies {
member_lookup: HashMap<NodeId, NodeFamilyId>,
family_lookup: HashMap<NodeFamilyId, NodeFamilyInformation>,
}
impl LoadedNodeFamilies {
/// Resolve the family `node_id` belongs to (if any) in one step.
fn family_for_node(&self, node_id: NodeId) -> Option<&NodeFamilyInformation> {
self.member_lookup
.get(&node_id)
.and_then(|fid| self.family_lookup.get(fid))
}
}
/// Load the families snapshot from DB and build two lookup maps:
/// `node_id → family_id` for member→family resolution, and
/// `family_id → NodeFamilyInformation` for hydrating gateway responses.
async fn load_family_lookup(storage: &db::Storage) -> anyhow::Result<LoadedNodeFamilies> {
let families = storage.get_all_node_families().await?;
let members = storage.get_all_node_family_members().await?;
let mut family_by_node = HashMap::new();
for m in &members {
let node_id = m.node_id as NodeId;
let family_id = m.family_id as NodeFamilyId;
family_by_node.insert(node_id, family_id);
}
let mut family_by_id: HashMap<u32, NodeFamilyInformation> = HashMap::new();
for f in families {
let family_id = f.family_id as u32;
let family_stake = f.family_stake_unym.unwrap_or_default() as u128;
let members_count = f.members_count as usize;
family_by_id.insert(
family_id,
NodeFamilyInformation::new(
family_id,
f.name,
f.description,
family_stake,
members_count,
),
);
}
Ok(LoadedNodeFamilies {
member_lookup: family_by_node,
family_lookup: family_by_id,
})
}
@@ -17,7 +17,7 @@ mod db;
mod http;
mod logging;
mod metrics_scraper;
mod monitor;
pub(crate) mod monitor;
mod node_scraper;
mod testruns;
mod ticketbook_manager;
@@ -182,7 +182,7 @@ async fn main() -> anyhow::Result<()> {
let shutdown_tracker = shutdown_manager.shutdown_tracker();
http::server::start_http_api(
storage.pool_owned(),
storage,
args.http_port,
args.nym_http_cache_ttl,
agent_key_list.to_owned(),
@@ -3,9 +3,10 @@
use crate::db::models::{
ASSIGNED_ENTRY_COUNT, ASSIGNED_EXIT_COUNT, ASSIGNED_MIXING_COUNT, GATEWAYS_BONDED_COUNT,
GATEWAYS_HISTORICAL_COUNT, GatewayInsertRecord, MIXNODES_HISTORICAL_COUNT, NYMNODE_COUNT,
NYMNODES_DESCRIBED_COUNT, NetworkSummary, NymNodeInsertRecord, gateway, mixnode,
NYMNODES_DESCRIBED_COUNT, NetworkSummary, NodeFamilyInsertRecord, NodeFamilyMemberInsertRecord,
NymNodeInsertRecord, gateway, mixnode,
};
use crate::db::{DbPool, queries};
use crate::db::{DbPool, Storage};
use crate::utils::now_utc;
use crate::utils::{LogError, NumericalCheckedCast};
use moka::future::Cache;
@@ -23,14 +24,14 @@ use tracing::instrument;
pub(crate) use geodata::{ExplorerPrettyBond, IpInfoClient, Location};
pub(crate) use node_delegations::DelegationsCache;
mod geodata;
pub(crate) mod geodata;
mod node_delegations;
const MONITOR_FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60);
pub(crate) type NodeGeoCache = Cache<NodeId, Location>;
struct Monitor {
db_pool: DbPool,
storage: Storage,
network_details: NymNetworkDetails,
nym_api_client_timeout: Duration,
nyxd_client: QueryHttpRpcNyxdClient,
@@ -54,7 +55,7 @@ pub(crate) async fn run_in_background(
let ipinfo = IpInfoClient::new(ipinfo_api_token.clone());
let mut monitor = Monitor {
db_pool,
storage: Storage::from_pool(db_pool),
network_details: nym_network_defaults::NymNetworkDetails::new_from_env(),
nym_api_client_timeout,
nyxd_client,
@@ -94,7 +95,7 @@ pub(crate) async fn run_once(
let ipinfo = IpInfoClient::new(ipinfo_api_token.clone());
let mut monitor = Monitor {
db_pool,
storage: Storage::from_pool(db_pool),
network_details: nym_network_defaults::NymNetworkDetails::new_from_env(),
nym_api_client_timeout,
nyxd_client,
@@ -170,12 +171,26 @@ impl Monitor {
let nym_node_records =
self.prepare_nym_node_data(nym_nodes.clone(), &bonded_nym_nodes, &described_nodes);
queries::update_nym_nodes(&self.db_pool, nym_node_records)
self.storage
.update_nym_nodes(nym_node_records)
.await
.map(|inserted| {
tracing::debug!("{} nym nodes written to DB!", inserted);
})?;
let node_families = nym_api
.get_all_node_families()
.await
.log_error("get_all_node_families")?;
tracing::info!("🟣 node families: {}", node_families.len());
let family_records = prepare_node_family_data(node_families);
self.storage
.update_node_families(family_records)
.await
.map(|inserted| {
tracing::debug!("{inserted} node families written to DB!");
})?;
// stop here if running once
if exit_early {
return Ok(());
@@ -209,9 +224,9 @@ impl Monitor {
.prepare_gateway_data(&gateways, &nym_nodes, &bonded_nym_nodes)
.await?;
let pool = self.db_pool.clone();
let gateways_count = gateway_records.len();
queries::update_bonded_gateways(&pool, gateway_records)
self.storage
.update_bonded_gateways(gateway_records)
.await
.map(|_| {
tracing::debug!("{} gateway records written to DB!", gateways_count);
@@ -219,7 +234,7 @@ impl Monitor {
self.refresh_node_delegations(&bonded_nym_nodes).await;
let (all_historical_gateways, all_historical_mixnodes) = historical_count(&pool).await?;
let (all_historical_gateways, all_historical_mixnodes) = self.historical_count().await?;
//
// write summary keys and values to table
@@ -267,7 +282,9 @@ impl Monitor {
},
};
queries::insert_summaries(&pool, &nodes_summary, &network_summary, last_updated).await?;
self.storage
.insert_summaries(&nodes_summary, &network_summary, last_updated)
.await?;
let mut log_lines: Vec<String> = vec![];
for (key, value) in nodes_summary.iter() {
@@ -402,22 +419,65 @@ impl Monitor {
// update after refreshing all to avoid holding write lock for too long
*self.node_delegations.write().await = delegations_per_node;
}
async fn historical_count(&self) -> anyhow::Result<(usize, usize)> {
let mut conn = self.storage.pool().acquire().await?;
let all_historical_gateways = sqlx::query_scalar!(r#"SELECT count(id) FROM gateways"#)
.fetch_one(&mut *conn)
.await?
.unwrap_or(0)
.cast_checked()?;
let all_historical_mixnodes = sqlx::query_scalar!(r#"SELECT count(id) FROM mixnodes"#)
.fetch_one(&mut *conn)
.await?
.unwrap_or(0)
.cast_checked()?;
Ok((all_historical_gateways, all_historical_mixnodes))
}
}
async fn historical_count(pool: &DbPool) -> anyhow::Result<(usize, usize)> {
let mut conn = pool.acquire().await?;
/// Project a nym-api families snapshot into the shape stored in the
/// `node_families` / `node_family_members` tables. Members with stake info
/// missing on the nym-api side are still recorded — only the per-family
/// stake total drops to `NULL`.
fn prepare_node_family_data(
families: Vec<nym_api_requests::models::node_families::NodeFamily>,
) -> Vec<NodeFamilyInsertRecord> {
let last_updated_utc = now_utc().unix_timestamp();
let all_historical_gateways = sqlx::query_scalar!(r#"SELECT count(id) FROM gateways"#)
.fetch_one(&mut *conn)
.await?
.unwrap_or(0)
.cast_checked()?;
families
.into_iter()
.map(|family| {
let family_stake_unym = family
.total_stake
.as_ref()
.and_then(|coin| i64::try_from(coin.amount.u128()).ok());
let all_historical_mixnodes = sqlx::query_scalar!(r#"SELECT count(id) FROM mixnodes"#)
.fetch_one(&mut *conn)
.await?
.unwrap_or(0)
.cast_checked()?;
let members_count = family.members.len() as i32;
Ok((all_historical_gateways, all_historical_mixnodes))
let members: Vec<_> = family
.members
.into_iter()
.map(|m| NodeFamilyMemberInsertRecord {
node_id: m.node_id as i64,
joined_at: m.joined_at.unix_timestamp(),
})
.collect();
NodeFamilyInsertRecord {
family_id: family.id as i64,
name: family.name,
description: family.description,
owner: family.owner,
family_stake_unym,
members_count,
created_at: family.created_at.unix_timestamp(),
last_updated_utc,
members,
}
})
.collect()
}