Nym Statistics API (#5800)

* move stats types from vpn-client to here

* base stats api

* change storage schema

* add link to nymAPI for whitelisting

* remove outdated comment

* more comments update

* example of chrono vs time

* Add build.rs
- exports DATABASE_URL so cargo check works
- exports SQLX_OFFLINE for CI
- added pg_up.sh which spawns PG container
  - required for cargo sqlx prepare

* fixes time vs chrono issue and cleaner build with docker

* add correct swagger types, with feature locking where relevant

* apply dynco suggestions

---------

Co-authored-by: dynco-nym <173912580+dynco-nym@users.noreply.github.com>
This commit is contained in:
Simon Wicky
2025-05-28 10:23:11 +02:00
committed by GitHub
parent d27e3b49db
commit b69c2e1e94
28 changed files with 990 additions and 2 deletions
+1
View File
@@ -13,6 +13,7 @@ on:
- 'nym-network-monitor/**'
- 'nym-node/**'
- 'nym-node-status-api/**'
- 'nym-statistics-api/**'
- 'nym-outfox/**'
- 'nym-validator-rewarder/**'
- 'nyx-chain-watcher/**'
Generated
+31
View File
@@ -6831,6 +6831,36 @@ dependencies = [
"thiserror 2.0.12",
]
[[package]]
name = "nym-statistics-api"
version = "0.1.0"
dependencies = [
"anyhow",
"axum 0.7.9",
"axum-extra",
"celes",
"clap",
"nym-bin-common",
"nym-http-api-client",
"nym-http-api-common",
"nym-statistics-common",
"nym-task",
"nym-validator-client",
"serde",
"serde_json",
"sqlx",
"time",
"tokio",
"tokio-util",
"tower-http",
"tracing",
"tracing-subscriber",
"url",
"utoipa",
"utoipa-swagger-ui",
"utoipauto",
]
[[package]]
name = "nym-statistics-common"
version = "0.1.0"
@@ -6851,6 +6881,7 @@ dependencies = [
"thiserror 2.0.12",
"time",
"tokio",
"utoipa",
"wasmtimer",
]
+2
View File
@@ -112,6 +112,7 @@ members = [
"nym-node/nym-node-metrics",
"nym-node/nym-node-requests",
"nym-outfox",
"nym-statistics-api",
"nym-validator-rewarder",
"nyx-chain-watcher",
"sdk/ffi/cpp",
@@ -152,6 +153,7 @@ default-members = [
"nym-node",
"nym-node-status-api/nym-node-status-agent",
"nym-node-status-api/nym-node-status-api",
"nym-statistics-api",
"nym-validator-rewarder",
"nyx-chain-watcher",
"service-providers/authenticator",
+6
View File
@@ -28,5 +28,11 @@ nym-credentials-interface = { path = "../credentials-interface" }
nym-metrics = { path = "../nym-metrics" }
nym-task = { path = "../task" }
utoipa = { workspace = true, optional = true }
[target."cfg(target_arch = \"wasm32\")".dependencies.wasmtimer]
workspace = true
[features]
default = []
openapi = ["dep:utoipa"]
+1 -1
View File
@@ -1,7 +1,7 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::report::{ClientStatsReport, OsInformation};
use crate::report::client::{ClientStatsReport, OsInformation};
use nym_task::TaskClient;
use time::{OffsetDateTime, Time};
+5
View File
@@ -26,11 +26,16 @@ pub mod report;
pub mod types;
const CLIENT_ID_PREFIX: &str = "client_stats_id";
const VPN_CLIENT_ID_PREFIX: &str = "vpnclient_stats_id";
pub fn generate_client_stats_id(id_key: ed25519::PublicKey) -> String {
generate_stats_id(CLIENT_ID_PREFIX, id_key.to_base58_string())
}
pub fn generate_vpn_client_stats_id<M: AsRef<[u8]>>(seed: M) -> String {
generate_stats_id(VPN_CLIENT_ID_PREFIX, seed)
}
fn generate_stats_id<M: AsRef<[u8]>>(prefix: &str, id_seed: M) -> String {
let mut hasher = sha2::Sha256::new();
hasher.update(prefix);
@@ -6,7 +6,7 @@ use crate::clients::{
nym_api_statistics::NymApiStats, packet_statistics::PacketStatistics,
};
use super::error::StatsError;
use crate::error::StatsError;
use serde::{Deserialize, Serialize};
use sysinfo::System;
+5
View File
@@ -0,0 +1,5 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod client;
pub mod vpn_client;
@@ -0,0 +1,51 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use serde::{Deserialize, Serialize};
const KIND: &str = "vpn_client_stats_report";
const VERSION: &str = "v1";
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VpnClientStatsReport {
pub kind: String,
pub api_version: String,
pub stats_id: String,
pub static_information: StaticInformationReport,
//SW called it basic so we can swap it easily down the line for more data
pub basic_usage: Option<UsageReport>,
}
impl VpnClientStatsReport {
pub fn new(stats_id: String, static_information: StaticInformationReport) -> Self {
VpnClientStatsReport {
kind: KIND.into(),
api_version: VERSION.into(),
stats_id,
static_information,
basic_usage: None,
}
}
#[must_use]
pub fn with_usage_report(mut self, usage_report: UsageReport) -> Self {
self.basic_usage = Some(usage_report);
self
}
}
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StaticInformationReport {
pub os_type: String,
pub os_version: Option<String>,
pub os_arch: String,
pub app_version: String,
}
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UsageReport {
pub connection_time_ms: Option<i32>,
pub two_hop: bool,
}
@@ -0,0 +1,21 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO active_device (\n day,\n device_id,\n os_type,\n os_version,\n architecture,\n app_version,\n user_agent,\n from_mixnet)\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8)\n ON CONFLICT (device_id, day) DO NOTHING",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Date",
"Text",
"Text",
"Text",
"Text",
"Text",
"Text",
"Bool"
]
},
"nullable": []
},
"hash": "13bf07e42c49ea365e816eb94e4e4f607989ee95f68a0fcd95bc4a53f4e79cbb"
}
@@ -0,0 +1,19 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO connection_stats (\n received_at,\n connection_time_ms,\n two_hop,\n source_ip,\n country_code,\n from_mixnet) VALUES ($1::timestamptz, $2, $3, $4, $5, $6)",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Timestamptz",
"Int4",
"Bool",
"Text",
"Text",
"Bool"
]
},
"nullable": []
},
"hash": "dce9f3dae7ae0dc5953d1f69e843bea9553fb05a2f656cfff6598f12a21c99ba"
}
+55
View File
@@ -0,0 +1,55 @@
# Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
# SPDX-License-Identifier: Apache-2.0
[package]
name = "nym-statistics-api"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
rust-version.workspace = true
[dependencies]
anyhow.workspace = true
axum = { workspace = true, features = ["tokio", "macros"] }
axum-extra = { workspace = true, features = ["typed-header"] }
celes.workspace = true
clap = { workspace = true, features = ["cargo", "derive", "env", "string"] }
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
"postgres",
"time",
] }
time.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-util.workspace = true
tracing.workspace = true
tracing-subscriber = { workspace = true, features = ["env-filter"] }
tower-http = { workspace = true, features = ["cors", "trace"] }
url.workspace = true
utoipa = { workspace = true, features = ["axum_extras", "time"] }
utoipa-swagger-ui = { workspace = true, features = ["axum"] }
utoipauto.workspace = true
#internal
nym-bin-common = { path = "../common/bin-common" }
nym-http-api-client = { path = "../common/http-api-client" }
nym-http-api-common = { path = "../common/http-api-common", features = [
"middleware",
] }
nym-statistics-common = { path = "../common/statistics", features = [
"openapi",
] }
nym-task = { path = "../common/task" }
nym-validator-client = { path = "../common/client-libs/validator-client" }
[build-dependencies]
anyhow = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] }
+28
View File
@@ -0,0 +1,28 @@
# Nym-statistics-api
A simple API to collect and store statistics sent by nym-vpn-client.
## Build instructions
The statistics API is backed by a PostgreSQL database so you'll need a PostgreSQL server running if you want to add migrations or add/modify SQL queries. I recommend https://postgresapp.com on MacOS, very easy to use. If you're on another OS, it's up to you.
Assuming your database is running at `postgresql://user:password@host:port/database_name` you'll likely need to run the following :
```bash
DATABASE_URL="postgresql://user:password@host:port/database_name"
# if you don't have an existing datase
sqlx database create --database-url $DATABASE_URL
sqlx migrate run --database-url $DATABASE_URL
# reset it if you messed with migrations while developping
sqlx database reset --database-url $DATABASE_URL
# or just run new migrations
sqlx migrate run --database-url $DATABASE_URL
# then prepare queries for offline build mode
cargo sqlx prepare --database-url $DATABASE_URL
```
This should allow `cargo build` without having any postgreSQL server running.
Be sure to add the `.sqlx` directory to version control
@@ -0,0 +1,24 @@
CREATE TABLE active_device (
day DATE NOT NULL,
device_id TEXT NOT NULL,
os_type TEXT,
os_version TEXT,
architecture TEXT,
app_version TEXT,
user_agent TEXT,
from_mixnet BOOLEAN,
PRIMARY KEY (device_id, day)
);
CREATE TABLE connection_stats (
received_at TIMESTAMP WITH TIME ZONE NOT NULL,
connection_time_ms INTEGER,
two_hop BOOLEAN,
source_ip TEXT,
country_code TEXT,
from_mixnet BOOLEAN
);
CREATE INDEX idx_active_device_day ON active_device (day);
+30
View File
@@ -0,0 +1,30 @@
#!/bin/bash
set -e
export PGUSER="nym"
export PGPASSWORD="password1"
export PGPORT="5432"
export DB_NAME="nym_statistics_api"
export DATABASE_URL="postgres://${PGUSER}:${PGPASSWORD}@localhost:${PGPORT}/${DB_NAME}"
cat <<EOF > .env
SQLX_OFFLINE=true
POSTGRES_USER=$PGUSER
POSTGRES_PASSWORD=$PGPASSWORD
PGPORT=$PGPORT
DB_NAME=$DB_NAME
DATABASE_URL=$DATABASE_URL
EOF
docker run --rm -it \
--name ${DB_NAME} \
-e POSTGRES_USER=${PGUSER} \
-e POSTGRES_PASSWORD=${PGPASSWORD} \
-e POSTGRES_DB=${DB_NAME} \
-p ${PGPORT}:${PGPORT} \
postgres
# sqlx migrate run
# cargo sqlx prepare
+38
View File
@@ -0,0 +1,38 @@
use clap::Parser;
use nym_bin_common::bin_info;
use std::sync::OnceLock;
use url::Url;
// Helper for passing LONG_VERSION to clap
fn pretty_build_info_static() -> &'static str {
static PRETTY_BUILD_INFORMATION: OnceLock<String> = OnceLock::new();
PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print())
}
#[derive(Clone, Debug, Parser)]
#[clap(author = "Nymtech", version, long_version = pretty_build_info_static(), about)]
pub(crate) struct Cli {
/// URL for the NYM API to get a network view from
#[clap(long, env = "NYM_API_URL")]
pub(crate) nym_api_url: Option<Url>,
/// HTTP port on which to run statistics api.
#[clap(long, default_value_t = 8000, env = "NYM_STATISTICS_API_HTTP_PORT")]
pub(crate) http_port: u16,
/// Connection url for the database.
#[clap(long, env = "DATABASE_URL")]
pub(crate) database_url: String,
/// Username for the database.
#[clap(long, env = "POSTGRES_USER")]
pub(crate) username: String,
/// Password for the database.
#[clap(long, env = "POSTGRES_PASSWORD")]
pub(crate) password: String,
/// PgSQL port for the database.
#[clap(long, default_value_t = 5432, env = "PGPORT")]
pub(crate) pg_port: u16,
}
+74
View File
@@ -0,0 +1,74 @@
use anyhow::anyhow;
use axum::{response::Redirect, Router};
use nym_http_api_common::middleware::logging::log_request_info;
use tokio::net::ToSocketAddrs;
use tower_http::cors::CorsLayer;
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
use crate::http::{server::HttpServer, state::AppState};
pub(crate) mod stats;
pub(crate) struct RouterBuilder {
unfinished_router: Router<AppState>,
}
impl RouterBuilder {
pub(crate) fn with_default_routes() -> Self {
let router = Router::new()
.merge(
SwaggerUi::new("/swagger")
.url("/api-docs/openapi.json", super::api_docs::ApiDoc::openapi()),
)
.route(
"/",
axum::routing::get(|| async { Redirect::permanent("/swagger") }),
)
.nest("/v1", Router::new().nest("/stats", stats::routes()));
Self {
unfinished_router: router,
}
}
pub(crate) fn with_state(self, state: AppState) -> RouterWithState {
RouterWithState {
router: self.finalize_routes().with_state(state),
}
}
fn finalize_routes(self) -> Router<AppState> {
// layers added later wrap earlier layers
self.unfinished_router
// CORS layer needs to wrap other API layers
.layer(setup_cors())
// logger should be outermost layer
.layer(axum::middleware::from_fn(log_request_info))
}
}
pub(crate) struct RouterWithState {
router: Router,
}
impl RouterWithState {
pub(crate) async fn build_server<A: ToSocketAddrs>(
self,
bind_address: A,
) -> anyhow::Result<HttpServer> {
tokio::net::TcpListener::bind(bind_address)
.await
.map(|listener| HttpServer::new(self.router, listener))
.map_err(|err| anyhow!("Couldn't bind to address due to {}", err))
}
}
fn setup_cors() -> CorsLayer {
use axum::http::Method;
CorsLayer::new()
.allow_origin(tower_http::cors::Any)
.allow_methods([Method::POST, Method::GET, Method::PATCH, Method::OPTIONS])
.allow_headers(tower_http::cors::Any)
.allow_credentials(false)
}
+63
View File
@@ -0,0 +1,63 @@
use std::net::SocketAddr;
use axum::{
extract::{ConnectInfo, State},
Json, Router,
};
use axum_extra::{headers::UserAgent, TypedHeader};
use nym_statistics_common::report::vpn_client::VpnClientStatsReport;
use tracing::debug;
use crate::{
http::{
error::{HttpError, HttpResult},
state::AppState,
},
storage::models::{ConnectionInfoDto, DailyActiveDeviceDto},
};
pub(crate) fn routes() -> Router<AppState> {
Router::new().route("/report", axum::routing::post(submit_stats_report))
}
#[utoipa::path(
post,
request_body = VpnClientStatsReport,
tag = "Stats",
path = "/report",
context_path = "/v1/stats",
responses(
(status = 200)
)
)]
#[tracing::instrument(level = "info", skip_all)]
async fn submit_stats_report(
State(mut state): State<AppState>,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
TypedHeader(user_agent): TypedHeader<UserAgent>,
Json(report): Json<VpnClientStatsReport>,
) -> HttpResult<Json<()>> {
let now = time::OffsetDateTime::now_utc();
let gateway_record = state.network_view().get_country_by_ip(&addr.ip()).await;
let from_mixnet = gateway_record.is_some();
let maybe_location = gateway_record.unwrap_or_default();
if from_mixnet {
debug!("Received a report from the network");
} else {
debug!("Received a report from outside of the network");
}
let active_device = DailyActiveDeviceDto::new(now, &report, user_agent, from_mixnet);
let maybe_connection_info =
ConnectionInfoDto::maybe_new(now, &report, addr, maybe_location, from_mixnet);
state
.storage()
.store_vpn_client_report(active_device, maybe_connection_info)
.await
.map_err(HttpError::internal_with_logging)?;
Ok(Json(()))
}
+10
View File
@@ -0,0 +1,10 @@
use utoipa::OpenApi;
use utoipauto::utoipauto;
// manually import external structs which are behind feature flags because they
// can't be automatically discovered
// https://github.com/ProbablyClem/utoipauto/issues/13#issuecomment-1974911829
#[utoipauto(paths = "./nym-statistics-api/src")]
#[derive(OpenApi)]
#[openapi(info(title = "Nym Statistics API"), tags(), components(schemas()))]
pub(super) struct ApiDoc;
+28
View File
@@ -0,0 +1,28 @@
use std::fmt::Display;
pub(crate) type HttpResult<T> = Result<T, HttpError>;
pub(crate) struct HttpError {
message: String,
status: axum::http::StatusCode,
}
impl HttpError {
pub(crate) fn internal_with_logging(msg: impl Display) -> Self {
tracing::error!("{}", msg.to_string());
Self::internal()
}
pub(crate) fn internal() -> Self {
Self {
message: serde_json::json!({"message": "Internal server error"}).to_string(),
status: axum::http::StatusCode::INTERNAL_SERVER_ERROR,
}
}
}
impl axum::response::IntoResponse for HttpError {
fn into_response(self) -> axum::response::Response {
(self.status, self.message).into_response()
}
}
+5
View File
@@ -0,0 +1,5 @@
pub(crate) mod api;
pub(crate) mod api_docs;
pub(crate) mod error;
pub(crate) mod server;
pub(crate) mod state;
+48
View File
@@ -0,0 +1,48 @@
use axum::Router;
use core::net::SocketAddr;
use nym_task::ShutdownToken;
use tokio::net::TcpListener;
use crate::{
http::{api::RouterBuilder, state::AppState},
network_view::NetworkView,
storage::StatisticsStorage,
};
pub(crate) async fn build_http_api(
storage: StatisticsStorage,
cached_network: NetworkView,
http_port: u16,
) -> anyhow::Result<HttpServer> {
let router_builder = RouterBuilder::with_default_routes();
let state = AppState::new(storage, cached_network).await;
let router = router_builder.with_state(state);
let bind_addr = format!("0.0.0.0:{}", http_port);
tracing::info!("Binding server to {bind_addr}");
router.build_server(bind_addr).await
}
pub(crate) struct HttpServer {
router: Router,
listener: TcpListener,
}
impl HttpServer {
pub(crate) fn new(router: Router, listener: TcpListener) -> Self {
Self { router, listener }
}
pub(crate) async fn run(self, shutdown_token: ShutdownToken) -> std::io::Result<()> {
// into_make_service_with_connect_info allows us to see client ip address
axum::serve(
self.listener,
self.router
.into_make_service_with_connect_info::<SocketAddr>(),
)
.with_graceful_shutdown(async move { shutdown_token.cancelled().await })
.await
}
}
+24
View File
@@ -0,0 +1,24 @@
use crate::{network_view::NetworkView, storage::StatisticsStorage};
#[derive(Debug, Clone)]
pub(crate) struct AppState {
storage_manager: StatisticsStorage,
network_view: NetworkView,
}
impl AppState {
pub(crate) async fn new(storage_manager: StatisticsStorage, network_view: NetworkView) -> Self {
Self {
storage_manager,
network_view,
}
}
pub(crate) fn storage(&mut self) -> &mut StatisticsStorage {
&mut self.storage_manager
}
pub(crate) fn network_view(&self) -> &NetworkView {
&self.network_view
}
}
+44
View File
@@ -0,0 +1,44 @@
use tracing::level_filters::LevelFilter;
use tracing_subscriber::{filter::Directive, EnvFilter};
pub(crate) fn setup_tracing_logger() -> anyhow::Result<()> {
fn directive_checked(directive: impl Into<String>) -> anyhow::Result<Directive> {
directive.into().parse().map_err(From::from)
}
let log_builder = tracing_subscriber::fmt()
// Use a more compact, abbreviated log format
.compact()
// Display source code file paths
.with_file(true)
// Display source code line numbers
.with_line_number(true)
.with_thread_ids(true)
// Don't display the event's target (module path)
.with_target(false);
let mut filter = EnvFilter::builder()
// if RUST_LOG isn't set, set default level
.with_default_directive(LevelFilter::DEBUG.into())
.from_env_lossy();
// these crates are more granularly filtered
let warn_crates = [
"rustls",
"sqlx",
"tower_http",
"axum",
"reqwest",
"hyper_util",
];
for crate_name in warn_crates {
filter = filter.add_directive(directive_checked(format!("{}=warn", crate_name))?);
}
let log_level_hint = filter.max_level_hint();
log_builder.with_env_filter(filter).init();
tracing::info!("Log level: {:?}", log_level_hint);
Ok(())
}
+53
View File
@@ -0,0 +1,53 @@
use clap::Parser;
use network_view::NetworkRefresher;
use nym_task::ShutdownManager;
mod cli;
mod http;
mod logging;
mod network_view;
mod storage;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
logging::setup_tracing_logger()?;
let args = cli::Cli::parse();
let connection_url = args.database_url.clone();
tracing::debug!("Using config:\n{:#?}", args);
let storage = storage::StatisticsStorage::init(
connection_url,
args.username,
args.password,
args.pg_port,
)
.await?;
tracing::info!("Connection to database successful");
let shutdown_manager = ShutdownManager::new("nym-statistics-api");
let network_refresher = NetworkRefresher::initialise_new(
args.nym_api_url,
shutdown_manager.child_token("network-refresher"),
)
.await?;
let http_server =
http::server::build_http_api(storage, network_refresher.network_view(), args.http_port)
.await
.expect("Failed to build http server");
let server_shutdown = shutdown_manager.clone_token("http-api-server");
// Starting tasks
shutdown_manager.spawn(async move { http_server.run(server_shutdown).await });
network_refresher.start();
tracing::info!("Started HTTP server on port {}", args.http_port);
shutdown_manager.close();
shutdown_manager.wait_for_shutdown_signal().await;
Ok(())
}
+152
View File
@@ -0,0 +1,152 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use anyhow::Result;
use nym_task::ShutdownToken;
use celes::Country;
use nym_validator_client::models::NymNodeDescription;
use nym_validator_client::NymApiClient;
use std::collections::HashMap;
use std::time::Duration;
use std::{net::IpAddr, sync::Arc};
use tokio::sync::RwLock;
use tokio::time::interval;
use url::Url;
use tracing::{error, info, trace, warn};
const NETWORK_CACHE_TTL: Duration = Duration::from_secs(600);
type IpToCountryMap = HashMap<IpAddr, Option<Country>>;
// SW this should use a proper NS API client once it exists
struct NodesQuerier {
client: NymApiClient,
}
impl NodesQuerier {
async fn current_nymnodes(&self) -> Result<Vec<NymNodeDescription>> {
Ok(self
.client
.get_all_described_nodes()
.await
.inspect_err(|err| error!("failed to get network nodes: {err}"))?)
}
}
#[derive(Clone, Debug)]
pub(crate) struct NetworkView {
inner: Arc<RwLock<NetworkViewInner>>,
}
impl NetworkView {
fn new_empty() -> Self {
NetworkView {
inner: Arc::new(RwLock::new(NetworkViewInner {
network_nodes: HashMap::new(),
})),
}
}
pub(crate) async fn get_country_by_ip(&self, ip_addr: &IpAddr) -> Option<Option<Country>> {
self.inner.read().await.network_nodes.get(ip_addr).copied()
}
}
#[derive(Debug)]
struct NetworkViewInner {
network_nodes: IpToCountryMap,
}
pub struct NetworkRefresher {
querier: Option<NodesQuerier>,
full_refresh_interval: Duration,
shutdown_token: ShutdownToken,
network: NetworkView,
}
impl NetworkRefresher {
pub(crate) async fn initialise_new(
maybe_nym_api_url: Option<Url>,
shutdown_token: ShutdownToken,
) -> Result<Self> {
let node_querier = match maybe_nym_api_url {
Some(url) => Some(NodesQuerier {
client: nym_http_api_client::Client::builder::<_, anyhow::Error>(url)?
.no_hickory_dns()
.with_user_agent("node-statistics-api")
.build::<anyhow::Error>()?
.into(),
}),
None => {
warn!("No Nym API specified, network view is unavailable");
None
}
};
let mut this = NetworkRefresher {
querier: node_querier,
full_refresh_interval: NETWORK_CACHE_TTL,
shutdown_token,
network: NetworkView::new_empty(),
};
this.refresh_network_nodes().await?;
Ok(this)
}
async fn refresh_network_nodes(&mut self) -> Result<()> {
if let Some(querier) = &self.querier {
let nodes = querier.current_nymnodes().await?;
// collect all known/allowed nodes information
let known_nodes = nodes
.iter()
.flat_map(|n| {
n.description
.host_information
.ip_address
.clone()
.into_iter()
.zip(std::iter::repeat(n.description.auxiliary_details.location))
})
.collect::<HashMap<_, _>>();
let mut network_guard = self.network.inner.write().await;
network_guard.network_nodes = known_nodes;
}
Ok(())
}
pub(crate) fn network_view(&self) -> NetworkView {
self.network.clone()
}
pub(crate) async fn run(&mut self) {
info!("NetworkRefresher started successfully");
let mut full_refresh_interval = interval(self.full_refresh_interval);
full_refresh_interval.reset();
while !self.shutdown_token.is_cancelled() {
tokio::select! {
biased;
_ = self.shutdown_token.cancelled() => {
trace!("NetworkRefresher: Received shutdown");
}
_ = full_refresh_interval.tick() => {
if self.refresh_network_nodes().await.is_err() {
warn!("Failed to refresh network nodes, we're gonna keep the same set");
}
}
}
}
trace!("NetworkRefresher: Exiting");
}
pub(crate) fn start(mut self) {
tokio::spawn(async move { self.run().await });
}
}
+102
View File
@@ -0,0 +1,102 @@
use anyhow::{anyhow, Result};
use models::{ConnectionInfoDto, DailyActiveDeviceDto};
use sqlx::{migrate::Migrator, postgres::PgConnectOptions};
use std::str::FromStr;
pub(crate) mod models;
pub(crate) type DbPool = sqlx::PgPool;
static MIGRATOR: Migrator = sqlx::migrate!("./migrations");
#[derive(Debug, Clone)]
pub(crate) struct StatisticsStorage {
connection_pool: DbPool,
}
impl StatisticsStorage {
pub async fn init(
connection_url: String,
user: String,
password: String,
port: u16,
) -> Result<Self> {
let connect_options = PgConnectOptions::from_str(&connection_url)?
.port(port)
.username(&user)
.password(&password)
.application_name(nym_bin_common::bin_info!().binary_name);
let pool = sqlx::PgPool::connect_with(connect_options)
.await
.map_err(|err| anyhow!("Failed to connect to {}: {}", &connection_url, err))?;
MIGRATOR.run(&pool).await?;
Ok(StatisticsStorage {
connection_pool: pool,
})
}
pub(crate) async fn store_vpn_client_report(
&mut self,
active_device: DailyActiveDeviceDto,
connection_info: Option<ConnectionInfoDto>,
) -> Result<()> {
self.store_device(active_device).await?;
if let Some(connection_info) = connection_info {
self.store_connection_stats(connection_info).await?;
}
Ok(())
}
// Interestingly enough, because gateway-storage is using the `chrono` feature of sqlx and in 0.7.4 it takes priority over the `time` one, we cannot use the query! macro here.
// Due to features unification, the binary will not compile when built from the workspace root because it will expect `chrono` types.
// As a consequence, there is no compile time verification of these queries.
async fn store_device(&self, active_device: DailyActiveDeviceDto) -> Result<()> {
sqlx::query!(
r#"INSERT INTO active_device (
day,
device_id,
os_type,
os_version,
architecture,
app_version,
user_agent,
from_mixnet)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (device_id, day) DO NOTHING"#,
active_device.day as time::Date,
active_device.stats_id,
active_device.os_type,
active_device.os_version,
active_device.os_arch,
active_device.app_version,
active_device.user_agent,
active_device.from_mixnet
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
async fn store_connection_stats(&self, connection_info: ConnectionInfoDto) -> Result<()> {
sqlx::query!(
r#"INSERT INTO connection_stats (
received_at,
connection_time_ms,
two_hop,
source_ip,
country_code,
from_mixnet) VALUES ($1::timestamptz, $2, $3, $4, $5, $6)"#,
connection_info.received_at as time::OffsetDateTime,
connection_info.connection_time_ms,
connection_info.two_hop,
connection_info.received_from,
connection_info.country_code,
connection_info.from_mixnet
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
}
+69
View File
@@ -0,0 +1,69 @@
use std::net::SocketAddr;
use axum_extra::headers::UserAgent;
use celes::Country;
use nym_statistics_common::report::vpn_client::VpnClientStatsReport;
use time::{Date, OffsetDateTime};
pub type StatsId = String;
#[derive(Debug, Clone, sqlx::FromRow)]
pub(crate) struct DailyActiveDeviceDto {
pub(crate) day: Date,
pub(crate) stats_id: StatsId,
pub(crate) os_type: String,
pub(crate) os_version: Option<String>,
pub(crate) os_arch: String,
pub(crate) app_version: String,
pub(crate) user_agent: String,
pub(crate) from_mixnet: bool,
}
impl DailyActiveDeviceDto {
pub(crate) fn new(
received_at: OffsetDateTime,
stats_report: &VpnClientStatsReport,
user_agent: UserAgent,
from_mixnet: bool,
) -> Self {
Self {
day: received_at.date(),
stats_id: stats_report.stats_id.clone(),
os_type: stats_report.static_information.os_type.clone(),
os_version: stats_report.static_information.os_version.clone(),
os_arch: stats_report.static_information.os_arch.clone(),
app_version: stats_report.static_information.app_version.clone(),
user_agent: user_agent.to_string(),
from_mixnet,
}
}
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub(crate) struct ConnectionInfoDto {
pub(crate) received_at: OffsetDateTime,
pub(crate) received_from: String,
pub(crate) connection_time_ms: Option<i32>,
pub(crate) two_hop: bool,
pub(crate) country_code: Option<String>,
pub(crate) from_mixnet: bool,
}
impl ConnectionInfoDto {
pub(crate) fn maybe_new(
received_at: OffsetDateTime,
stats_report: &VpnClientStatsReport,
received_from: SocketAddr,
maybe_country: Option<Country>,
from_mixnet: bool,
) -> Option<Self> {
stats_report.basic_usage.as_ref().map(|usage_report| Self {
received_at,
received_from: received_from.ip().to_string(),
connection_time_ms: usage_report.connection_time_ms,
two_hop: usage_report.two_hop,
country_code: maybe_country.map(|c| c.alpha2.into()),
from_mixnet,
})
}
}