From b69c2e1e9436fe87e57f6d72c0640c06f7eaf26d Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Wed, 28 May 2025 10:23:11 +0200 Subject: [PATCH] Nym Statistics API (#5800) * move stats types from vpn-client to here * base stats api * change storage schema * add link to nymAPI for whitelisting * remove outdated comment * more comments update * example of chrono vs time * Add build.rs - exports DATABASE_URL so cargo check works - exports SQLX_OFFLINE for CI - added pg_up.sh which spawns PG container - required for cargo sqlx prepare * fixes time vs chrono issue and cleaner build with docker * add correct swagger types, with feature locking where relevant * apply dynco suggestions --------- Co-authored-by: dynco-nym <173912580+dynco-nym@users.noreply.github.com> --- .github/workflows/ci-build.yml | 1 + Cargo.lock | 31 ++++ Cargo.toml | 2 + common/statistics/Cargo.toml | 6 + common/statistics/src/clients/mod.rs | 2 +- common/statistics/src/lib.rs | 5 + .../src/{report.rs => report/client.rs} | 2 +- common/statistics/src/report/mod.rs | 5 + common/statistics/src/report/vpn_client.rs | 51 ++++++ ...e4f607989ee95f68a0fcd95bc4a53f4e79cbb.json | 21 +++ ...3bea9553fb05a2f656cfff6598f12a21c99ba.json | 19 +++ nym-statistics-api/Cargo.toml | 55 +++++++ nym-statistics-api/README.md | 28 ++++ nym-statistics-api/migrations/001_init.sql | 24 +++ nym-statistics-api/pg_up.sh | 30 ++++ nym-statistics-api/src/cli/mod.rs | 38 +++++ nym-statistics-api/src/http/api/mod.rs | 74 +++++++++ nym-statistics-api/src/http/api/stats.rs | 63 ++++++++ nym-statistics-api/src/http/api_docs.rs | 10 ++ nym-statistics-api/src/http/error.rs | 28 ++++ nym-statistics-api/src/http/mod.rs | 5 + nym-statistics-api/src/http/server.rs | 48 ++++++ nym-statistics-api/src/http/state.rs | 24 +++ nym-statistics-api/src/logging.rs | 44 +++++ nym-statistics-api/src/main.rs | 53 ++++++ nym-statistics-api/src/network_view.rs | 152 ++++++++++++++++++ nym-statistics-api/src/storage/mod.rs | 102 ++++++++++++ nym-statistics-api/src/storage/models.rs | 69 ++++++++ 28 files changed, 990 insertions(+), 2 deletions(-) rename common/statistics/src/{report.rs => report/client.rs} (98%) create mode 100644 common/statistics/src/report/mod.rs create mode 100644 common/statistics/src/report/vpn_client.rs create mode 100644 nym-statistics-api/.sqlx/query-13bf07e42c49ea365e816eb94e4e4f607989ee95f68a0fcd95bc4a53f4e79cbb.json create mode 100644 nym-statistics-api/.sqlx/query-dce9f3dae7ae0dc5953d1f69e843bea9553fb05a2f656cfff6598f12a21c99ba.json create mode 100644 nym-statistics-api/Cargo.toml create mode 100644 nym-statistics-api/README.md create mode 100644 nym-statistics-api/migrations/001_init.sql create mode 100755 nym-statistics-api/pg_up.sh create mode 100644 nym-statistics-api/src/cli/mod.rs create mode 100644 nym-statistics-api/src/http/api/mod.rs create mode 100644 nym-statistics-api/src/http/api/stats.rs create mode 100644 nym-statistics-api/src/http/api_docs.rs create mode 100644 nym-statistics-api/src/http/error.rs create mode 100644 nym-statistics-api/src/http/mod.rs create mode 100644 nym-statistics-api/src/http/server.rs create mode 100644 nym-statistics-api/src/http/state.rs create mode 100644 nym-statistics-api/src/logging.rs create mode 100644 nym-statistics-api/src/main.rs create mode 100644 nym-statistics-api/src/network_view.rs create mode 100644 nym-statistics-api/src/storage/mod.rs create mode 100644 nym-statistics-api/src/storage/models.rs diff --git a/.github/workflows/ci-build.yml b/.github/workflows/ci-build.yml index 68ba52e81e..5863df8822 100644 --- a/.github/workflows/ci-build.yml +++ b/.github/workflows/ci-build.yml @@ -13,6 +13,7 @@ on: - 'nym-network-monitor/**' - 'nym-node/**' - 'nym-node-status-api/**' + - 'nym-statistics-api/**' - 'nym-outfox/**' - 'nym-validator-rewarder/**' - 'nyx-chain-watcher/**' diff --git a/Cargo.lock b/Cargo.lock index 3f2747f1ca..a243d3ce70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6831,6 +6831,36 @@ dependencies = [ "thiserror 2.0.12", ] +[[package]] +name = "nym-statistics-api" +version = "0.1.0" +dependencies = [ + "anyhow", + "axum 0.7.9", + "axum-extra", + "celes", + "clap", + "nym-bin-common", + "nym-http-api-client", + "nym-http-api-common", + "nym-statistics-common", + "nym-task", + "nym-validator-client", + "serde", + "serde_json", + "sqlx", + "time", + "tokio", + "tokio-util", + "tower-http", + "tracing", + "tracing-subscriber", + "url", + "utoipa", + "utoipa-swagger-ui", + "utoipauto", +] + [[package]] name = "nym-statistics-common" version = "0.1.0" @@ -6851,6 +6881,7 @@ dependencies = [ "thiserror 2.0.12", "time", "tokio", + "utoipa", "wasmtimer", ] diff --git a/Cargo.toml b/Cargo.toml index cd9914aa61..36bf8931b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -112,6 +112,7 @@ members = [ "nym-node/nym-node-metrics", "nym-node/nym-node-requests", "nym-outfox", + "nym-statistics-api", "nym-validator-rewarder", "nyx-chain-watcher", "sdk/ffi/cpp", @@ -152,6 +153,7 @@ default-members = [ "nym-node", "nym-node-status-api/nym-node-status-agent", "nym-node-status-api/nym-node-status-api", + "nym-statistics-api", "nym-validator-rewarder", "nyx-chain-watcher", "service-providers/authenticator", diff --git a/common/statistics/Cargo.toml b/common/statistics/Cargo.toml index 7ddd23a44b..1bbd7317c3 100644 --- a/common/statistics/Cargo.toml +++ b/common/statistics/Cargo.toml @@ -28,5 +28,11 @@ nym-credentials-interface = { path = "../credentials-interface" } nym-metrics = { path = "../nym-metrics" } nym-task = { path = "../task" } +utoipa = { workspace = true, optional = true } + [target."cfg(target_arch = \"wasm32\")".dependencies.wasmtimer] workspace = true + +[features] +default = [] +openapi = ["dep:utoipa"] diff --git a/common/statistics/src/clients/mod.rs b/common/statistics/src/clients/mod.rs index d9c29b48c2..2ca1fa006d 100644 --- a/common/statistics/src/clients/mod.rs +++ b/common/statistics/src/clients/mod.rs @@ -1,7 +1,7 @@ // Copyright 2024 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 -use crate::report::{ClientStatsReport, OsInformation}; +use crate::report::client::{ClientStatsReport, OsInformation}; use nym_task::TaskClient; use time::{OffsetDateTime, Time}; diff --git a/common/statistics/src/lib.rs b/common/statistics/src/lib.rs index 55175984ad..6eb1eea7ff 100644 --- a/common/statistics/src/lib.rs +++ b/common/statistics/src/lib.rs @@ -26,11 +26,16 @@ pub mod report; pub mod types; const CLIENT_ID_PREFIX: &str = "client_stats_id"; +const VPN_CLIENT_ID_PREFIX: &str = "vpnclient_stats_id"; pub fn generate_client_stats_id(id_key: ed25519::PublicKey) -> String { generate_stats_id(CLIENT_ID_PREFIX, id_key.to_base58_string()) } +pub fn generate_vpn_client_stats_id>(seed: M) -> String { + generate_stats_id(VPN_CLIENT_ID_PREFIX, seed) +} + fn generate_stats_id>(prefix: &str, id_seed: M) -> String { let mut hasher = sha2::Sha256::new(); hasher.update(prefix); diff --git a/common/statistics/src/report.rs b/common/statistics/src/report/client.rs similarity index 98% rename from common/statistics/src/report.rs rename to common/statistics/src/report/client.rs index f01cf2be06..0a1e61364e 100644 --- a/common/statistics/src/report.rs +++ b/common/statistics/src/report/client.rs @@ -6,7 +6,7 @@ use crate::clients::{ nym_api_statistics::NymApiStats, packet_statistics::PacketStatistics, }; -use super::error::StatsError; +use crate::error::StatsError; use serde::{Deserialize, Serialize}; use sysinfo::System; diff --git a/common/statistics/src/report/mod.rs b/common/statistics/src/report/mod.rs new file mode 100644 index 0000000000..27f117f9ff --- /dev/null +++ b/common/statistics/src/report/mod.rs @@ -0,0 +1,5 @@ +// Copyright 2025 - Nym Technologies SA +// SPDX-License-Identifier: Apache-2.0 + +pub mod client; +pub mod vpn_client; diff --git a/common/statistics/src/report/vpn_client.rs b/common/statistics/src/report/vpn_client.rs new file mode 100644 index 0000000000..ab1260cae5 --- /dev/null +++ b/common/statistics/src/report/vpn_client.rs @@ -0,0 +1,51 @@ +// Copyright 2025 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use serde::{Deserialize, Serialize}; + +const KIND: &str = "vpn_client_stats_report"; +const VERSION: &str = "v1"; + +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct VpnClientStatsReport { + pub kind: String, + pub api_version: String, + pub stats_id: String, + pub static_information: StaticInformationReport, + //SW called it basic so we can swap it easily down the line for more data + pub basic_usage: Option, +} + +impl VpnClientStatsReport { + pub fn new(stats_id: String, static_information: StaticInformationReport) -> Self { + VpnClientStatsReport { + kind: KIND.into(), + api_version: VERSION.into(), + stats_id, + static_information, + basic_usage: None, + } + } + + #[must_use] + pub fn with_usage_report(mut self, usage_report: UsageReport) -> Self { + self.basic_usage = Some(usage_report); + self + } +} +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StaticInformationReport { + pub os_type: String, + pub os_version: Option, + pub os_arch: String, + pub app_version: String, +} + +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UsageReport { + pub connection_time_ms: Option, + pub two_hop: bool, +} diff --git a/nym-statistics-api/.sqlx/query-13bf07e42c49ea365e816eb94e4e4f607989ee95f68a0fcd95bc4a53f4e79cbb.json b/nym-statistics-api/.sqlx/query-13bf07e42c49ea365e816eb94e4e4f607989ee95f68a0fcd95bc4a53f4e79cbb.json new file mode 100644 index 0000000000..7525b8cc22 --- /dev/null +++ b/nym-statistics-api/.sqlx/query-13bf07e42c49ea365e816eb94e4e4f607989ee95f68a0fcd95bc4a53f4e79cbb.json @@ -0,0 +1,21 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO active_device (\n day,\n device_id,\n os_type,\n os_version,\n architecture,\n app_version,\n user_agent,\n from_mixnet)\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8)\n ON CONFLICT (device_id, day) DO NOTHING", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Date", + "Text", + "Text", + "Text", + "Text", + "Text", + "Text", + "Bool" + ] + }, + "nullable": [] + }, + "hash": "13bf07e42c49ea365e816eb94e4e4f607989ee95f68a0fcd95bc4a53f4e79cbb" +} diff --git a/nym-statistics-api/.sqlx/query-dce9f3dae7ae0dc5953d1f69e843bea9553fb05a2f656cfff6598f12a21c99ba.json b/nym-statistics-api/.sqlx/query-dce9f3dae7ae0dc5953d1f69e843bea9553fb05a2f656cfff6598f12a21c99ba.json new file mode 100644 index 0000000000..b477ea243d --- /dev/null +++ b/nym-statistics-api/.sqlx/query-dce9f3dae7ae0dc5953d1f69e843bea9553fb05a2f656cfff6598f12a21c99ba.json @@ -0,0 +1,19 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO connection_stats (\n received_at,\n connection_time_ms,\n two_hop,\n source_ip,\n country_code,\n from_mixnet) VALUES ($1::timestamptz, $2, $3, $4, $5, $6)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Timestamptz", + "Int4", + "Bool", + "Text", + "Text", + "Bool" + ] + }, + "nullable": [] + }, + "hash": "dce9f3dae7ae0dc5953d1f69e843bea9553fb05a2f656cfff6598f12a21c99ba" +} diff --git a/nym-statistics-api/Cargo.toml b/nym-statistics-api/Cargo.toml new file mode 100644 index 0000000000..c0357fa5a0 --- /dev/null +++ b/nym-statistics-api/Cargo.toml @@ -0,0 +1,55 @@ +# Copyright 2025 - Nym Technologies SA +# SPDX-License-Identifier: Apache-2.0 + +[package] +name = "nym-statistics-api" +version = "0.1.0" +authors.workspace = true +repository.workspace = true +homepage.workspace = true +documentation.workspace = true +edition.workspace = true +license.workspace = true +rust-version.workspace = true + + +[dependencies] +anyhow.workspace = true +axum = { workspace = true, features = ["tokio", "macros"] } +axum-extra = { workspace = true, features = ["typed-header"] } +celes.workspace = true +clap = { workspace = true, features = ["cargo", "derive", "env", "string"] } +serde = { workspace = true, features = ["derive"] } +serde_json.workspace = true +sqlx = { workspace = true, features = [ + "runtime-tokio-rustls", + "postgres", + "time", +] } +time.workspace = true +tokio = { workspace = true, features = ["rt-multi-thread"] } +tokio-util.workspace = true +tracing.workspace = true +tracing-subscriber = { workspace = true, features = ["env-filter"] } +tower-http = { workspace = true, features = ["cors", "trace"] } +url.workspace = true +utoipa = { workspace = true, features = ["axum_extras", "time"] } +utoipa-swagger-ui = { workspace = true, features = ["axum"] } +utoipauto.workspace = true + +#internal +nym-bin-common = { path = "../common/bin-common" } +nym-http-api-client = { path = "../common/http-api-client" } +nym-http-api-common = { path = "../common/http-api-common", features = [ + "middleware", +] } +nym-statistics-common = { path = "../common/statistics", features = [ + "openapi", +] } +nym-task = { path = "../common/task" } +nym-validator-client = { path = "../common/client-libs/validator-client" } + +[build-dependencies] +anyhow = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] } diff --git a/nym-statistics-api/README.md b/nym-statistics-api/README.md new file mode 100644 index 0000000000..8b633c4631 --- /dev/null +++ b/nym-statistics-api/README.md @@ -0,0 +1,28 @@ +# Nym-statistics-api + +A simple API to collect and store statistics sent by nym-vpn-client. + +## Build instructions + +The statistics API is backed by a PostgreSQL database so you'll need a PostgreSQL server running if you want to add migrations or add/modify SQL queries. I recommend https://postgresapp.com on MacOS, very easy to use. If you're on another OS, it's up to you. + +Assuming your database is running at `postgresql://user:password@host:port/database_name` you'll likely need to run the following : +```bash +DATABASE_URL="postgresql://user:password@host:port/database_name" + +# if you don't have an existing datase +sqlx database create --database-url $DATABASE_URL +sqlx migrate run --database-url $DATABASE_URL + +# reset it if you messed with migrations while developping +sqlx database reset --database-url $DATABASE_URL + +# or just run new migrations +sqlx migrate run --database-url $DATABASE_URL + +# then prepare queries for offline build mode +cargo sqlx prepare --database-url $DATABASE_URL +``` + +This should allow `cargo build` without having any postgreSQL server running. +Be sure to add the `.sqlx` directory to version control diff --git a/nym-statistics-api/migrations/001_init.sql b/nym-statistics-api/migrations/001_init.sql new file mode 100644 index 0000000000..739df793bd --- /dev/null +++ b/nym-statistics-api/migrations/001_init.sql @@ -0,0 +1,24 @@ + +CREATE TABLE active_device ( + day DATE NOT NULL, + device_id TEXT NOT NULL, + os_type TEXT, + os_version TEXT, + architecture TEXT, + app_version TEXT, + user_agent TEXT, + from_mixnet BOOLEAN, + PRIMARY KEY (device_id, day) +); + +CREATE TABLE connection_stats ( + received_at TIMESTAMP WITH TIME ZONE NOT NULL, + connection_time_ms INTEGER, + two_hop BOOLEAN, + source_ip TEXT, + country_code TEXT, + from_mixnet BOOLEAN +); + + +CREATE INDEX idx_active_device_day ON active_device (day); \ No newline at end of file diff --git a/nym-statistics-api/pg_up.sh b/nym-statistics-api/pg_up.sh new file mode 100755 index 0000000000..963efb6a91 --- /dev/null +++ b/nym-statistics-api/pg_up.sh @@ -0,0 +1,30 @@ +#!/bin/bash +set -e + +export PGUSER="nym" +export PGPASSWORD="password1" +export PGPORT="5432" +export DB_NAME="nym_statistics_api" +export DATABASE_URL="postgres://${PGUSER}:${PGPASSWORD}@localhost:${PGPORT}/${DB_NAME}" + +cat < .env +SQLX_OFFLINE=true +POSTGRES_USER=$PGUSER +POSTGRES_PASSWORD=$PGPASSWORD +PGPORT=$PGPORT +DB_NAME=$DB_NAME +DATABASE_URL=$DATABASE_URL +EOF + + +docker run --rm -it \ + --name ${DB_NAME} \ + -e POSTGRES_USER=${PGUSER} \ + -e POSTGRES_PASSWORD=${PGPASSWORD} \ + -e POSTGRES_DB=${DB_NAME} \ + -p ${PGPORT}:${PGPORT} \ + postgres + + +# sqlx migrate run +# cargo sqlx prepare \ No newline at end of file diff --git a/nym-statistics-api/src/cli/mod.rs b/nym-statistics-api/src/cli/mod.rs new file mode 100644 index 0000000000..5a8373fa4a --- /dev/null +++ b/nym-statistics-api/src/cli/mod.rs @@ -0,0 +1,38 @@ +use clap::Parser; +use nym_bin_common::bin_info; +use std::sync::OnceLock; +use url::Url; + +// Helper for passing LONG_VERSION to clap +fn pretty_build_info_static() -> &'static str { + static PRETTY_BUILD_INFORMATION: OnceLock = OnceLock::new(); + PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print()) +} + +#[derive(Clone, Debug, Parser)] +#[clap(author = "Nymtech", version, long_version = pretty_build_info_static(), about)] +pub(crate) struct Cli { + /// URL for the NYM API to get a network view from + #[clap(long, env = "NYM_API_URL")] + pub(crate) nym_api_url: Option, + + /// HTTP port on which to run statistics api. + #[clap(long, default_value_t = 8000, env = "NYM_STATISTICS_API_HTTP_PORT")] + pub(crate) http_port: u16, + + /// Connection url for the database. + #[clap(long, env = "DATABASE_URL")] + pub(crate) database_url: String, + + /// Username for the database. + #[clap(long, env = "POSTGRES_USER")] + pub(crate) username: String, + + /// Password for the database. + #[clap(long, env = "POSTGRES_PASSWORD")] + pub(crate) password: String, + + /// PgSQL port for the database. + #[clap(long, default_value_t = 5432, env = "PGPORT")] + pub(crate) pg_port: u16, +} diff --git a/nym-statistics-api/src/http/api/mod.rs b/nym-statistics-api/src/http/api/mod.rs new file mode 100644 index 0000000000..9e3fc313d3 --- /dev/null +++ b/nym-statistics-api/src/http/api/mod.rs @@ -0,0 +1,74 @@ +use anyhow::anyhow; +use axum::{response::Redirect, Router}; +use nym_http_api_common::middleware::logging::log_request_info; +use tokio::net::ToSocketAddrs; +use tower_http::cors::CorsLayer; +use utoipa::OpenApi; +use utoipa_swagger_ui::SwaggerUi; + +use crate::http::{server::HttpServer, state::AppState}; + +pub(crate) mod stats; + +pub(crate) struct RouterBuilder { + unfinished_router: Router, +} + +impl RouterBuilder { + pub(crate) fn with_default_routes() -> Self { + let router = Router::new() + .merge( + SwaggerUi::new("/swagger") + .url("/api-docs/openapi.json", super::api_docs::ApiDoc::openapi()), + ) + .route( + "/", + axum::routing::get(|| async { Redirect::permanent("/swagger") }), + ) + .nest("/v1", Router::new().nest("/stats", stats::routes())); + + Self { + unfinished_router: router, + } + } + + pub(crate) fn with_state(self, state: AppState) -> RouterWithState { + RouterWithState { + router: self.finalize_routes().with_state(state), + } + } + + fn finalize_routes(self) -> Router { + // layers added later wrap earlier layers + self.unfinished_router + // CORS layer needs to wrap other API layers + .layer(setup_cors()) + // logger should be outermost layer + .layer(axum::middleware::from_fn(log_request_info)) + } +} + +pub(crate) struct RouterWithState { + router: Router, +} + +impl RouterWithState { + pub(crate) async fn build_server( + self, + bind_address: A, + ) -> anyhow::Result { + tokio::net::TcpListener::bind(bind_address) + .await + .map(|listener| HttpServer::new(self.router, listener)) + .map_err(|err| anyhow!("Couldn't bind to address due to {}", err)) + } +} + +fn setup_cors() -> CorsLayer { + use axum::http::Method; + CorsLayer::new() + .allow_origin(tower_http::cors::Any) + .allow_methods([Method::POST, Method::GET, Method::PATCH, Method::OPTIONS]) + .allow_headers(tower_http::cors::Any) + .allow_credentials(false) +} diff --git a/nym-statistics-api/src/http/api/stats.rs b/nym-statistics-api/src/http/api/stats.rs new file mode 100644 index 0000000000..4490535cc4 --- /dev/null +++ b/nym-statistics-api/src/http/api/stats.rs @@ -0,0 +1,63 @@ +use std::net::SocketAddr; + +use axum::{ + extract::{ConnectInfo, State}, + Json, Router, +}; +use axum_extra::{headers::UserAgent, TypedHeader}; +use nym_statistics_common::report::vpn_client::VpnClientStatsReport; +use tracing::debug; + +use crate::{ + http::{ + error::{HttpError, HttpResult}, + state::AppState, + }, + storage::models::{ConnectionInfoDto, DailyActiveDeviceDto}, +}; + +pub(crate) fn routes() -> Router { + Router::new().route("/report", axum::routing::post(submit_stats_report)) +} + +#[utoipa::path( + post, + request_body = VpnClientStatsReport, + tag = "Stats", + path = "/report", + context_path = "/v1/stats", + responses( + (status = 200) + ) +)] +#[tracing::instrument(level = "info", skip_all)] +async fn submit_stats_report( + State(mut state): State, + ConnectInfo(addr): ConnectInfo, + TypedHeader(user_agent): TypedHeader, + Json(report): Json, +) -> HttpResult> { + let now = time::OffsetDateTime::now_utc(); + let gateway_record = state.network_view().get_country_by_ip(&addr.ip()).await; + + let from_mixnet = gateway_record.is_some(); + let maybe_location = gateway_record.unwrap_or_default(); + + if from_mixnet { + debug!("Received a report from the network"); + } else { + debug!("Received a report from outside of the network"); + } + + let active_device = DailyActiveDeviceDto::new(now, &report, user_agent, from_mixnet); + let maybe_connection_info = + ConnectionInfoDto::maybe_new(now, &report, addr, maybe_location, from_mixnet); + + state + .storage() + .store_vpn_client_report(active_device, maybe_connection_info) + .await + .map_err(HttpError::internal_with_logging)?; + + Ok(Json(())) +} diff --git a/nym-statistics-api/src/http/api_docs.rs b/nym-statistics-api/src/http/api_docs.rs new file mode 100644 index 0000000000..b3a644a0bb --- /dev/null +++ b/nym-statistics-api/src/http/api_docs.rs @@ -0,0 +1,10 @@ +use utoipa::OpenApi; +use utoipauto::utoipauto; + +// manually import external structs which are behind feature flags because they +// can't be automatically discovered +// https://github.com/ProbablyClem/utoipauto/issues/13#issuecomment-1974911829 +#[utoipauto(paths = "./nym-statistics-api/src")] +#[derive(OpenApi)] +#[openapi(info(title = "Nym Statistics API"), tags(), components(schemas()))] +pub(super) struct ApiDoc; diff --git a/nym-statistics-api/src/http/error.rs b/nym-statistics-api/src/http/error.rs new file mode 100644 index 0000000000..15119ac67e --- /dev/null +++ b/nym-statistics-api/src/http/error.rs @@ -0,0 +1,28 @@ +use std::fmt::Display; + +pub(crate) type HttpResult = Result; + +pub(crate) struct HttpError { + message: String, + status: axum::http::StatusCode, +} + +impl HttpError { + pub(crate) fn internal_with_logging(msg: impl Display) -> Self { + tracing::error!("{}", msg.to_string()); + Self::internal() + } + + pub(crate) fn internal() -> Self { + Self { + message: serde_json::json!({"message": "Internal server error"}).to_string(), + status: axum::http::StatusCode::INTERNAL_SERVER_ERROR, + } + } +} + +impl axum::response::IntoResponse for HttpError { + fn into_response(self) -> axum::response::Response { + (self.status, self.message).into_response() + } +} diff --git a/nym-statistics-api/src/http/mod.rs b/nym-statistics-api/src/http/mod.rs new file mode 100644 index 0000000000..1506514f0f --- /dev/null +++ b/nym-statistics-api/src/http/mod.rs @@ -0,0 +1,5 @@ +pub(crate) mod api; +pub(crate) mod api_docs; +pub(crate) mod error; +pub(crate) mod server; +pub(crate) mod state; diff --git a/nym-statistics-api/src/http/server.rs b/nym-statistics-api/src/http/server.rs new file mode 100644 index 0000000000..3aa57c7da6 --- /dev/null +++ b/nym-statistics-api/src/http/server.rs @@ -0,0 +1,48 @@ +use axum::Router; +use core::net::SocketAddr; +use nym_task::ShutdownToken; +use tokio::net::TcpListener; + +use crate::{ + http::{api::RouterBuilder, state::AppState}, + network_view::NetworkView, + storage::StatisticsStorage, +}; + +pub(crate) async fn build_http_api( + storage: StatisticsStorage, + cached_network: NetworkView, + http_port: u16, +) -> anyhow::Result { + let router_builder = RouterBuilder::with_default_routes(); + + let state = AppState::new(storage, cached_network).await; + let router = router_builder.with_state(state); + + let bind_addr = format!("0.0.0.0:{}", http_port); + tracing::info!("Binding server to {bind_addr}"); + + router.build_server(bind_addr).await +} + +pub(crate) struct HttpServer { + router: Router, + listener: TcpListener, +} + +impl HttpServer { + pub(crate) fn new(router: Router, listener: TcpListener) -> Self { + Self { router, listener } + } + + pub(crate) async fn run(self, shutdown_token: ShutdownToken) -> std::io::Result<()> { + // into_make_service_with_connect_info allows us to see client ip address + axum::serve( + self.listener, + self.router + .into_make_service_with_connect_info::(), + ) + .with_graceful_shutdown(async move { shutdown_token.cancelled().await }) + .await + } +} diff --git a/nym-statistics-api/src/http/state.rs b/nym-statistics-api/src/http/state.rs new file mode 100644 index 0000000000..0f17aa5b84 --- /dev/null +++ b/nym-statistics-api/src/http/state.rs @@ -0,0 +1,24 @@ +use crate::{network_view::NetworkView, storage::StatisticsStorage}; + +#[derive(Debug, Clone)] +pub(crate) struct AppState { + storage_manager: StatisticsStorage, + network_view: NetworkView, +} + +impl AppState { + pub(crate) async fn new(storage_manager: StatisticsStorage, network_view: NetworkView) -> Self { + Self { + storage_manager, + network_view, + } + } + + pub(crate) fn storage(&mut self) -> &mut StatisticsStorage { + &mut self.storage_manager + } + + pub(crate) fn network_view(&self) -> &NetworkView { + &self.network_view + } +} diff --git a/nym-statistics-api/src/logging.rs b/nym-statistics-api/src/logging.rs new file mode 100644 index 0000000000..1adfa31b71 --- /dev/null +++ b/nym-statistics-api/src/logging.rs @@ -0,0 +1,44 @@ +use tracing::level_filters::LevelFilter; +use tracing_subscriber::{filter::Directive, EnvFilter}; + +pub(crate) fn setup_tracing_logger() -> anyhow::Result<()> { + fn directive_checked(directive: impl Into) -> anyhow::Result { + directive.into().parse().map_err(From::from) + } + + let log_builder = tracing_subscriber::fmt() + // Use a more compact, abbreviated log format + .compact() + // Display source code file paths + .with_file(true) + // Display source code line numbers + .with_line_number(true) + .with_thread_ids(true) + // Don't display the event's target (module path) + .with_target(false); + + let mut filter = EnvFilter::builder() + // if RUST_LOG isn't set, set default level + .with_default_directive(LevelFilter::DEBUG.into()) + .from_env_lossy(); + + // these crates are more granularly filtered + let warn_crates = [ + "rustls", + "sqlx", + "tower_http", + "axum", + "reqwest", + "hyper_util", + ]; + for crate_name in warn_crates { + filter = filter.add_directive(directive_checked(format!("{}=warn", crate_name))?); + } + + let log_level_hint = filter.max_level_hint(); + + log_builder.with_env_filter(filter).init(); + tracing::info!("Log level: {:?}", log_level_hint); + + Ok(()) +} diff --git a/nym-statistics-api/src/main.rs b/nym-statistics-api/src/main.rs new file mode 100644 index 0000000000..f08896dbc5 --- /dev/null +++ b/nym-statistics-api/src/main.rs @@ -0,0 +1,53 @@ +use clap::Parser; +use network_view::NetworkRefresher; +use nym_task::ShutdownManager; + +mod cli; +mod http; +mod logging; +mod network_view; +mod storage; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + logging::setup_tracing_logger()?; + + let args = cli::Cli::parse(); + + let connection_url = args.database_url.clone(); + tracing::debug!("Using config:\n{:#?}", args); + + let storage = storage::StatisticsStorage::init( + connection_url, + args.username, + args.password, + args.pg_port, + ) + .await?; + tracing::info!("Connection to database successful"); + + let shutdown_manager = ShutdownManager::new("nym-statistics-api"); + + let network_refresher = NetworkRefresher::initialise_new( + args.nym_api_url, + shutdown_manager.child_token("network-refresher"), + ) + .await?; + + let http_server = + http::server::build_http_api(storage, network_refresher.network_view(), args.http_port) + .await + .expect("Failed to build http server"); + let server_shutdown = shutdown_manager.clone_token("http-api-server"); + + // Starting tasks + shutdown_manager.spawn(async move { http_server.run(server_shutdown).await }); + network_refresher.start(); + + tracing::info!("Started HTTP server on port {}", args.http_port); + + shutdown_manager.close(); + shutdown_manager.wait_for_shutdown_signal().await; + + Ok(()) +} diff --git a/nym-statistics-api/src/network_view.rs b/nym-statistics-api/src/network_view.rs new file mode 100644 index 0000000000..159b1a0fa2 --- /dev/null +++ b/nym-statistics-api/src/network_view.rs @@ -0,0 +1,152 @@ +// Copyright 2024 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use anyhow::Result; +use nym_task::ShutdownToken; + +use celes::Country; +use nym_validator_client::models::NymNodeDescription; +use nym_validator_client::NymApiClient; +use std::collections::HashMap; +use std::time::Duration; +use std::{net::IpAddr, sync::Arc}; +use tokio::sync::RwLock; +use tokio::time::interval; +use url::Url; + +use tracing::{error, info, trace, warn}; + +const NETWORK_CACHE_TTL: Duration = Duration::from_secs(600); + +type IpToCountryMap = HashMap>; + +// SW this should use a proper NS API client once it exists +struct NodesQuerier { + client: NymApiClient, +} + +impl NodesQuerier { + async fn current_nymnodes(&self) -> Result> { + Ok(self + .client + .get_all_described_nodes() + .await + .inspect_err(|err| error!("failed to get network nodes: {err}"))?) + } +} + +#[derive(Clone, Debug)] +pub(crate) struct NetworkView { + inner: Arc>, +} + +impl NetworkView { + fn new_empty() -> Self { + NetworkView { + inner: Arc::new(RwLock::new(NetworkViewInner { + network_nodes: HashMap::new(), + })), + } + } + + pub(crate) async fn get_country_by_ip(&self, ip_addr: &IpAddr) -> Option> { + self.inner.read().await.network_nodes.get(ip_addr).copied() + } +} + +#[derive(Debug)] +struct NetworkViewInner { + network_nodes: IpToCountryMap, +} + +pub struct NetworkRefresher { + querier: Option, + full_refresh_interval: Duration, + shutdown_token: ShutdownToken, + + network: NetworkView, +} + +impl NetworkRefresher { + pub(crate) async fn initialise_new( + maybe_nym_api_url: Option, + shutdown_token: ShutdownToken, + ) -> Result { + let node_querier = match maybe_nym_api_url { + Some(url) => Some(NodesQuerier { + client: nym_http_api_client::Client::builder::<_, anyhow::Error>(url)? + .no_hickory_dns() + .with_user_agent("node-statistics-api") + .build::()? + .into(), + }), + None => { + warn!("No Nym API specified, network view is unavailable"); + None + } + }; + + let mut this = NetworkRefresher { + querier: node_querier, + full_refresh_interval: NETWORK_CACHE_TTL, + shutdown_token, + network: NetworkView::new_empty(), + }; + + this.refresh_network_nodes().await?; + Ok(this) + } + + async fn refresh_network_nodes(&mut self) -> Result<()> { + if let Some(querier) = &self.querier { + let nodes = querier.current_nymnodes().await?; + + // collect all known/allowed nodes information + let known_nodes = nodes + .iter() + .flat_map(|n| { + n.description + .host_information + .ip_address + .clone() + .into_iter() + .zip(std::iter::repeat(n.description.auxiliary_details.location)) + }) + .collect::>(); + + let mut network_guard = self.network.inner.write().await; + network_guard.network_nodes = known_nodes; + } + + Ok(()) + } + + pub(crate) fn network_view(&self) -> NetworkView { + self.network.clone() + } + + pub(crate) async fn run(&mut self) { + info!("NetworkRefresher started successfully"); + let mut full_refresh_interval = interval(self.full_refresh_interval); + full_refresh_interval.reset(); + + while !self.shutdown_token.is_cancelled() { + tokio::select! { + biased; + _ = self.shutdown_token.cancelled() => { + trace!("NetworkRefresher: Received shutdown"); + } + _ = full_refresh_interval.tick() => { + if self.refresh_network_nodes().await.is_err() { + warn!("Failed to refresh network nodes, we're gonna keep the same set"); + } + } + } + } + trace!("NetworkRefresher: Exiting"); + } + + pub(crate) fn start(mut self) { + tokio::spawn(async move { self.run().await }); + } +} diff --git a/nym-statistics-api/src/storage/mod.rs b/nym-statistics-api/src/storage/mod.rs new file mode 100644 index 0000000000..fe14eedbea --- /dev/null +++ b/nym-statistics-api/src/storage/mod.rs @@ -0,0 +1,102 @@ +use anyhow::{anyhow, Result}; +use models::{ConnectionInfoDto, DailyActiveDeviceDto}; +use sqlx::{migrate::Migrator, postgres::PgConnectOptions}; +use std::str::FromStr; + +pub(crate) mod models; + +pub(crate) type DbPool = sqlx::PgPool; +static MIGRATOR: Migrator = sqlx::migrate!("./migrations"); + +#[derive(Debug, Clone)] +pub(crate) struct StatisticsStorage { + connection_pool: DbPool, +} + +impl StatisticsStorage { + pub async fn init( + connection_url: String, + user: String, + password: String, + port: u16, + ) -> Result { + let connect_options = PgConnectOptions::from_str(&connection_url)? + .port(port) + .username(&user) + .password(&password) + .application_name(nym_bin_common::bin_info!().binary_name); + + let pool = sqlx::PgPool::connect_with(connect_options) + .await + .map_err(|err| anyhow!("Failed to connect to {}: {}", &connection_url, err))?; + + MIGRATOR.run(&pool).await?; + + Ok(StatisticsStorage { + connection_pool: pool, + }) + } + + pub(crate) async fn store_vpn_client_report( + &mut self, + active_device: DailyActiveDeviceDto, + connection_info: Option, + ) -> Result<()> { + self.store_device(active_device).await?; + if let Some(connection_info) = connection_info { + self.store_connection_stats(connection_info).await?; + } + Ok(()) + } + + // Interestingly enough, because gateway-storage is using the `chrono` feature of sqlx and in 0.7.4 it takes priority over the `time` one, we cannot use the query! macro here. + // Due to features unification, the binary will not compile when built from the workspace root because it will expect `chrono` types. + // As a consequence, there is no compile time verification of these queries. + async fn store_device(&self, active_device: DailyActiveDeviceDto) -> Result<()> { + sqlx::query!( + r#"INSERT INTO active_device ( + day, + device_id, + os_type, + os_version, + architecture, + app_version, + user_agent, + from_mixnet) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (device_id, day) DO NOTHING"#, + active_device.day as time::Date, + active_device.stats_id, + active_device.os_type, + active_device.os_version, + active_device.os_arch, + active_device.app_version, + active_device.user_agent, + active_device.from_mixnet + ) + .execute(&self.connection_pool) + .await?; + Ok(()) + } + + async fn store_connection_stats(&self, connection_info: ConnectionInfoDto) -> Result<()> { + sqlx::query!( + r#"INSERT INTO connection_stats ( + received_at, + connection_time_ms, + two_hop, + source_ip, + country_code, + from_mixnet) VALUES ($1::timestamptz, $2, $3, $4, $5, $6)"#, + connection_info.received_at as time::OffsetDateTime, + connection_info.connection_time_ms, + connection_info.two_hop, + connection_info.received_from, + connection_info.country_code, + connection_info.from_mixnet + ) + .execute(&self.connection_pool) + .await?; + Ok(()) + } +} diff --git a/nym-statistics-api/src/storage/models.rs b/nym-statistics-api/src/storage/models.rs new file mode 100644 index 0000000000..5ac863dcd6 --- /dev/null +++ b/nym-statistics-api/src/storage/models.rs @@ -0,0 +1,69 @@ +use std::net::SocketAddr; + +use axum_extra::headers::UserAgent; +use celes::Country; +use nym_statistics_common::report::vpn_client::VpnClientStatsReport; +use time::{Date, OffsetDateTime}; + +pub type StatsId = String; + +#[derive(Debug, Clone, sqlx::FromRow)] +pub(crate) struct DailyActiveDeviceDto { + pub(crate) day: Date, + pub(crate) stats_id: StatsId, + pub(crate) os_type: String, + pub(crate) os_version: Option, + pub(crate) os_arch: String, + pub(crate) app_version: String, + pub(crate) user_agent: String, + pub(crate) from_mixnet: bool, +} + +impl DailyActiveDeviceDto { + pub(crate) fn new( + received_at: OffsetDateTime, + stats_report: &VpnClientStatsReport, + user_agent: UserAgent, + from_mixnet: bool, + ) -> Self { + Self { + day: received_at.date(), + stats_id: stats_report.stats_id.clone(), + os_type: stats_report.static_information.os_type.clone(), + os_version: stats_report.static_information.os_version.clone(), + os_arch: stats_report.static_information.os_arch.clone(), + app_version: stats_report.static_information.app_version.clone(), + user_agent: user_agent.to_string(), + from_mixnet, + } + } +} + +#[derive(Debug, Clone, sqlx::FromRow)] +pub(crate) struct ConnectionInfoDto { + pub(crate) received_at: OffsetDateTime, + pub(crate) received_from: String, + pub(crate) connection_time_ms: Option, + pub(crate) two_hop: bool, + pub(crate) country_code: Option, + pub(crate) from_mixnet: bool, +} + +impl ConnectionInfoDto { + pub(crate) fn maybe_new( + received_at: OffsetDateTime, + stats_report: &VpnClientStatsReport, + received_from: SocketAddr, + maybe_country: Option, + from_mixnet: bool, + ) -> Option { + stats_report.basic_usage.as_ref().map(|usage_report| Self { + received_at, + received_from: received_from.ip().to_string(), + connection_time_ms: usage_report.connection_time_ms, + two_hop: usage_report.two_hop, + country_code: maybe_country.map(|c| c.alpha2.into()), + from_mixnet, + }) + } +}