Statistics API v2 (#6227)

* vpn client report v2

* report v2 support in nym-stats API

* version bump

* CI fix while we're at it

* more CI fix

* needed the dind after all

* PR comments
This commit is contained in:
Simon Wicky
2025-11-25 13:16:31 +01:00
committed by GitHub
parent c9ef46c51d
commit 6eb8f29235
14 changed files with 340 additions and 115 deletions
@@ -10,7 +10,7 @@ env:
jobs:
check-if-tag-exists:
runs-on: arc-ubuntu-22.04-dind
runs-on: arc-linux-latest-dind
steps:
- name: Checkout repo
uses: actions/checkout@v4
+3 -3
View File
@@ -8,6 +8,6 @@ jobs:
steps:
- uses: actions/first-interaction@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
issue-message: 'Thank you for raising this issue'
pr-message: 'Thank you for making this first PR'
repo_token: ${{ secrets.GITHUB_TOKEN }}
issue_message: 'Thank you for raising this issue'
pr_message: 'Thank you for making this first PR'
Generated
+1 -2
View File
@@ -7197,7 +7197,7 @@ dependencies = [
[[package]]
name = "nym-statistics-api"
version = "0.2.1"
version = "0.3.0"
dependencies = [
"anyhow",
"axum",
@@ -7211,7 +7211,6 @@ dependencies = [
"nym-statistics-common",
"nym-task",
"nym-validator-client",
"serde",
"serde_json",
"sqlx",
"time",
+1 -1
View File
@@ -17,7 +17,7 @@ serde = { workspace = true }
serde_json = { workspace = true }
sha2 = { workspace = true }
thiserror = { workspace = true }
time = { workspace = true }
time = { workspace = true, features = ["serde"] }
tokio = { workspace = true }
si-scale = { workspace = true }
strum = { workspace = true }
+49 -5
View File
@@ -2,9 +2,12 @@
// SPDX-License-Identifier: GPL-3.0-only
use serde::{Deserialize, Serialize};
use time::Date;
const KIND: &str = "vpn_client_stats_report";
const VERSION: &str = "v1";
const BASIC_REPORT_KIND: &str = "vpn_client_stats_report";
const SESSION_REPORT_KIND: &str = "vpn_client_session_report";
const VERSION_1: &str = "v1";
const VERSION_2: &str = "v2";
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -13,15 +16,14 @@ pub struct VpnClientStatsReport {
pub api_version: String,
pub stats_id: String,
pub static_information: StaticInformationReport,
//SW called it basic so we can swap it easily down the line for more data
pub basic_usage: Option<UsageReport>,
}
impl VpnClientStatsReport {
pub fn new(stats_id: String, static_information: StaticInformationReport) -> Self {
VpnClientStatsReport {
kind: KIND.into(),
api_version: VERSION.into(),
kind: BASIC_REPORT_KIND.into(),
api_version: VERSION_1.into(),
stats_id,
static_information,
basic_usage: None,
@@ -34,6 +36,34 @@ impl VpnClientStatsReport {
self
}
}
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VpnClientStatsReportV2 {
pub kind: String,
pub api_version: String,
pub stats_id: String,
pub static_information: StaticInformationReport,
pub session_report: SessionReport,
}
impl VpnClientStatsReportV2 {
pub fn new(
stats_id: String,
static_information: StaticInformationReport,
session_report: SessionReport,
) -> Self {
VpnClientStatsReportV2 {
kind: SESSION_REPORT_KIND.into(),
api_version: VERSION_2.into(),
stats_id,
static_information,
session_report,
}
}
}
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StaticInformationReport {
@@ -49,3 +79,17 @@ pub struct UsageReport {
pub connection_time_ms: Option<i32>,
pub two_hop: bool,
}
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionReport {
pub start_day_utc: Date,
pub connection_time_ms: i32,
pub tunnel_type: String,
pub retry_attempt: i32,
pub session_duration_min: i32,
pub disconnection_time_ms: i32,
pub exit_id: String,
pub follow_up_id: Option<String>,
pub error: Option<String>,
}
@@ -0,0 +1,33 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO report_v2(\n received_at,\n source_ip,\n from_mixnet,\n country_code,\n report_version,\n device_id,\n os_type,\n os_version,\n architecture,\n app_version,\n user_agent,\n start_day_utc,\n connection_time_ms,\n tunnel_type,\n retry_attempt,\n session_duration_min,\n disconnection_time_ms,\n exit_id,\n follow_up_id,\n error)\n VALUES ($1::timestamptz, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20)",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Timestamptz",
"Text",
"Bool",
"Text",
"Text",
"Text",
"Text",
"Text",
"Text",
"Text",
"Text",
"Date",
"Int4",
"Text",
"Int4",
"Int4",
"Int4",
"Text",
"Text",
"Text"
]
},
"nullable": []
},
"hash": "14d75cdd34201313e34ae7f0b931c9df43603232e3be42b0573013cd74226518"
}
@@ -1,19 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO connection_stats (\n received_at,\n connection_time_ms,\n two_hop,\n source_ip,\n country_code,\n from_mixnet) VALUES ($1::timestamptz, $2, $3, $4, $5, $6)",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Timestamptz",
"Int4",
"Bool",
"Text",
"Text",
"Bool"
]
},
"nullable": []
},
"hash": "dce9f3dae7ae0dc5953d1f69e843bea9553fb05a2f656cfff6598f12a21c99ba"
}
+2 -3
View File
@@ -3,7 +3,7 @@
[package]
name = "nym-statistics-api"
version = "0.2.1"
version = "0.3.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
@@ -20,14 +20,13 @@ axum-client-ip.workspace = true
axum-extra = { workspace = true, features = ["typed-header"] }
celes.workspace = true
clap = { workspace = true, features = ["cargo", "derive", "env", "string"] }
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
"postgres",
"time",
] }
time.workspace = true
time = { workspace = true, features = ["serde-human-readable"] }
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-util.workspace = true
tracing.workspace = true
@@ -0,0 +1,29 @@
CREATE TABLE report_v2 (
-- some info about the report, inferred from when/from where we got it
received_at TIMESTAMP WITH TIME ZONE NOT NULL,
source_ip TEXT,
from_mixnet BOOLEAN,
country_code TEXT,
report_version TEXT,
-- some infos about the device sending the report
device_id TEXT NOT NULL,
os_type TEXT,
os_version TEXT,
architecture TEXT,
app_version TEXT,
user_agent TEXT,
-- session info
start_day_utc DATE,
connection_time_ms INTEGER,
tunnel_type TEXT,
retry_attempt INTEGER,
session_duration_min INTEGER,
disconnection_time_ms INTEGER,
exit_id TEXT,
follow_up_id TEXT,
error TEXT
);
CREATE INDEX idx_report_v2_received_at ON report_v2 (received_at);
+1 -1
View File
@@ -24,7 +24,7 @@ impl RouterBuilder {
)
.route(
"/",
axum::routing::get(|| async { Redirect::permanent("/swagger") }), // SW let's redirect to a blogpost explaining the stats collection process once it exists
axum::routing::get(|| async { Redirect::permanent("/swagger") }),
)
.nest("/v1", Router::new().nest("/stats", stats::routes()));
+65 -14
View File
@@ -1,7 +1,7 @@
use axum::{Json, Router, extract::State};
use axum_client_ip::InsecureClientIp;
use axum_extra::{TypedHeader, headers::UserAgent};
use nym_statistics_common::report::vpn_client::VpnClientStatsReport;
use nym_statistics_common::report::vpn_client::{VpnClientStatsReport, VpnClientStatsReportV2};
use tracing::debug;
use crate::{
@@ -9,11 +9,13 @@ use crate::{
error::{HttpError, HttpResult},
state::AppState,
},
storage::models::{ConnectionInfoDto, DailyActiveDeviceDto, StatsReportV1Dto},
storage::models::{DailyActiveDeviceDto, StatsReportV1Dto, StatsReportV2Dto},
};
pub(crate) fn routes() -> Router<AppState> {
Router::new().route("/report", axum::routing::post(submit_stats_report))
Router::new()
.route("/report", axum::routing::post(submit_stats_report))
.route("/session", axum::routing::post(submit_session_report))
}
#[utoipa::path(
@@ -44,19 +46,11 @@ async fn submit_stats_report(
let maybe_location = gateway_record.unwrap_or_default();
if from_mixnet {
debug!("Received a report from the network");
debug!("Received a V1 report from the network");
} else {
debug!("Received a report from outside of the network");
debug!("Received a V1 report from outside of the network");
}
let active_device = DailyActiveDeviceDto::new(now, &report, user_agent.clone(), from_mixnet);
let maybe_connection_info = ConnectionInfoDto::maybe_new(
now,
&report,
insecure_ip_addr.0,
maybe_location,
from_mixnet,
);
let stats_report = StatsReportV1Dto::new(
now,
@@ -75,7 +69,64 @@ async fn submit_stats_report(
state
.storage()
.store_legacy_vpn_client_report(active_device, maybe_connection_info)
.store_active_device(active_device)
.await
.map_err(HttpError::internal_with_logging)?;
Ok(Json(()))
}
#[utoipa::path(
post,
request_body = VpnClientStatsReportV2,
tag = "Stats",
path = "/session",
context_path = "/v1/stats",
responses(
(status = 200)
)
)]
#[tracing::instrument(level = "info", skip_all)]
async fn submit_session_report(
State(mut state): State<AppState>,
TypedHeader(user_agent): TypedHeader<UserAgent>,
insecure_ip_addr: InsecureClientIp, // This is the reverse proxy IP for now, but maybe in the future?
Json(report): Json<VpnClientStatsReportV2>,
) -> HttpResult<Json<()>> {
let now = time::OffsetDateTime::now_utc();
let gateway_record = state
.network_view()
.get_country_by_id(&report.session_report.exit_id)
.await;
let from_mixnet = gateway_record.is_some();
let maybe_location = gateway_record.unwrap_or_default();
if from_mixnet {
debug!("Received a V2 report from the network");
} else {
debug!("Received a V2 report from outside of the network");
}
let active_device = DailyActiveDeviceDto::new_v2(now, &report, user_agent.clone(), from_mixnet);
let stats_report = StatsReportV2Dto::new(
now,
&report,
user_agent,
from_mixnet,
insecure_ip_addr.0,
maybe_location,
);
state
.storage()
.store_active_device(active_device)
.await
.map_err(HttpError::internal_with_logging)?;
state
.storage()
.store_vpn_client_report_v2(stats_report)
.await
.map_err(HttpError::internal_with_logging)?;
+22 -5
View File
@@ -20,6 +20,7 @@ use tracing::{error, info, trace, warn};
const NETWORK_CACHE_TTL: Duration = Duration::from_secs(600);
type IpToCountryMap = HashMap<IpAddr, Option<Country>>;
type IdToCountryMap = HashMap<String, Option<Country>>;
// SW this should use a proper NS API client once it exists
struct NodesQuerier {
@@ -45,19 +46,24 @@ impl NetworkView {
fn new_empty() -> Self {
NetworkView {
inner: Arc::new(RwLock::new(NetworkViewInner {
network_nodes: HashMap::new(),
ip_to_country: HashMap::new(),
id_to_country: HashMap::new(),
})),
}
}
pub(crate) async fn get_country_by_ip(&self, ip_addr: &IpAddr) -> Option<Option<Country>> {
self.inner.read().await.network_nodes.get(ip_addr).copied()
self.inner.read().await.ip_to_country.get(ip_addr).copied()
}
pub(crate) async fn get_country_by_id(&self, id_key: &str) -> Option<Option<Country>> {
self.inner.read().await.id_to_country.get(id_key).copied()
}
}
#[derive(Debug)]
struct NetworkViewInner {
network_nodes: IpToCountryMap,
ip_to_country: IpToCountryMap,
id_to_country: IdToCountryMap,
}
pub struct NetworkRefresher {
@@ -112,7 +118,7 @@ impl NetworkRefresher {
let nodes = querier.current_nymnodes().await?;
// collect all known/allowed nodes information
let known_nodes = nodes
let ip_to_country = nodes
.iter()
.flat_map(|n| {
n.description
@@ -124,8 +130,19 @@ impl NetworkRefresher {
})
.collect::<HashMap<_, _>>();
let id_to_country = nodes
.iter()
.map(|n| {
(
n.ed25519_identity_key().to_base58_string(),
n.description.auxiliary_details.location,
)
})
.collect::<HashMap<_, _>>();
let mut network_guard = self.network.inner.write().await;
network_guard.network_nodes = known_nodes;
network_guard.ip_to_country = ip_to_country;
network_guard.id_to_country = id_to_country;
}
Ok(())
+56 -34
View File
@@ -1,5 +1,4 @@
use anyhow::{Result, anyhow};
use models::{ConnectionInfoDto, DailyActiveDeviceDto};
use sqlx::{
Executor,
migrate::Migrator,
@@ -7,7 +6,7 @@ use sqlx::{
};
use std::{path::PathBuf, str::FromStr};
use crate::storage::models::StatsReportV1Dto;
use crate::storage::models::{DailyActiveDeviceDto, StatsReportV1Dto, StatsReportV2Dto};
pub(crate) mod models;
@@ -62,19 +61,10 @@ impl StatisticsStorage {
})
}
pub(crate) async fn store_legacy_vpn_client_report(
&mut self,
pub(crate) async fn store_active_device(
&self,
active_device: DailyActiveDeviceDto,
connection_info: Option<ConnectionInfoDto>,
) -> Result<()> {
self.store_device(active_device).await?;
if let Some(connection_info) = connection_info {
self.store_connection_stats(connection_info).await?;
}
Ok(())
}
async fn store_device(&self, active_device: DailyActiveDeviceDto) -> Result<()> {
sqlx::query!(
r#"INSERT INTO active_device (
day,
@@ -101,27 +91,6 @@ impl StatisticsStorage {
Ok(())
}
async fn store_connection_stats(&self, connection_info: ConnectionInfoDto) -> Result<()> {
sqlx::query!(
r#"INSERT INTO connection_stats (
received_at,
connection_time_ms,
two_hop,
source_ip,
country_code,
from_mixnet) VALUES ($1::timestamptz, $2, $3, $4, $5, $6)"#,
connection_info.received_at as time::OffsetDateTime,
connection_info.connection_time_ms,
connection_info.two_hop,
connection_info.received_from,
connection_info.country_code,
connection_info.from_mixnet
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
pub(crate) async fn store_vpn_client_report(
&mut self,
report_v1: StatsReportV1Dto,
@@ -158,4 +127,57 @@ impl StatisticsStorage {
.await?;
Ok(())
}
pub(crate) async fn store_vpn_client_report_v2(
&self,
report_v2: StatsReportV2Dto,
) -> Result<()> {
sqlx::query!(
r#"INSERT INTO report_v2(
received_at,
source_ip,
from_mixnet,
country_code,
report_version,
device_id,
os_type,
os_version,
architecture,
app_version,
user_agent,
start_day_utc,
connection_time_ms,
tunnel_type,
retry_attempt,
session_duration_min,
disconnection_time_ms,
exit_id,
follow_up_id,
error)
VALUES ($1::timestamptz, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20)"#,
report_v2.received_at as time::OffsetDateTime,
report_v2.received_from,
report_v2.from_mixnet,
report_v2.country_code,
report_v2.report_version,
report_v2.stats_id,
report_v2.os_type,
report_v2.os_version,
report_v2.os_arch,
report_v2.app_version,
report_v2.user_agent,
report_v2.start_day_utc,
report_v2.connection_time_ms,
report_v2.tunnel_type,
report_v2.retry_attempt,
report_v2.session_duration_min,
report_v2.disconnection_time_ms,
report_v2.exit_id,
report_v2.follow_up_id,
report_v2.error
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
}
+77 -27
View File
@@ -2,7 +2,7 @@ use std::net::IpAddr;
use axum_extra::headers::UserAgent;
use celes::Country;
use nym_statistics_common::report::vpn_client::VpnClientStatsReport;
use nym_statistics_common::report::vpn_client::{VpnClientStatsReport, VpnClientStatsReportV2};
use time::{Date, OffsetDateTime};
pub type StatsId = String;
@@ -37,38 +37,25 @@ impl DailyActiveDeviceDto {
from_mixnet,
}
}
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub(crate) struct ConnectionInfoDto {
pub(crate) received_at: OffsetDateTime,
pub(crate) received_from: String,
pub(crate) connection_time_ms: Option<i32>,
pub(crate) two_hop: bool,
pub(crate) country_code: Option<String>,
pub(crate) from_mixnet: bool,
}
impl ConnectionInfoDto {
pub(crate) fn maybe_new(
pub(crate) fn new_v2(
received_at: OffsetDateTime,
stats_report: &VpnClientStatsReport,
received_from: IpAddr,
maybe_country: Option<Country>,
stats_report: &VpnClientStatsReportV2,
user_agent: UserAgent,
from_mixnet: bool,
) -> Option<Self> {
stats_report.basic_usage.as_ref().map(|usage_report| Self {
received_at,
received_from: received_from.to_string(),
connection_time_ms: usage_report.connection_time_ms,
two_hop: usage_report.two_hop,
country_code: maybe_country.map(|c| c.alpha2.into()),
) -> Self {
Self {
day: received_at.date(),
stats_id: stats_report.stats_id.clone(),
os_type: stats_report.static_information.os_type.clone(),
os_version: stats_report.static_information.os_version.clone(),
os_arch: stats_report.static_information.os_arch.clone(),
app_version: stats_report.static_information.app_version.clone(),
user_agent: user_agent.to_string(),
from_mixnet,
})
}
}
}
// New structure. The two above will be removed when it is confirmed to work
#[derive(Debug, Clone, sqlx::FromRow)]
pub(crate) struct StatsReportV1Dto {
pub(crate) received_at: OffsetDateTime,
@@ -116,3 +103,66 @@ impl StatsReportV1Dto {
report
}
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub(crate) struct StatsReportV2Dto {
// Report metadata
pub(crate) received_at: OffsetDateTime,
pub(crate) received_from: String,
pub(crate) from_mixnet: bool,
pub(crate) country_code: Option<String>,
pub(crate) report_version: String,
// Device info
pub(crate) stats_id: StatsId,
pub(crate) os_type: String,
pub(crate) os_version: Option<String>,
pub(crate) os_arch: String,
pub(crate) app_version: String,
pub(crate) user_agent: String,
// session info
pub(crate) start_day_utc: Date,
pub(crate) connection_time_ms: i32,
pub(crate) tunnel_type: String,
pub(crate) retry_attempt: i32,
pub(crate) session_duration_min: i32,
pub(crate) disconnection_time_ms: i32,
pub(crate) exit_id: String,
pub(crate) follow_up_id: Option<String>,
pub(crate) error: Option<String>,
}
impl StatsReportV2Dto {
pub(crate) fn new(
received_at: OffsetDateTime,
stats_report: &VpnClientStatsReportV2,
user_agent: UserAgent,
from_mixnet: bool,
received_from: IpAddr,
maybe_country: Option<Country>,
) -> Self {
Self {
received_at,
received_from: received_from.to_string(),
from_mixnet,
country_code: maybe_country.map(|c| c.alpha2.into()),
report_version: stats_report.api_version.clone(),
stats_id: stats_report.stats_id.clone(),
os_type: stats_report.static_information.os_type.clone(),
os_version: stats_report.static_information.os_version.clone(),
os_arch: stats_report.static_information.os_arch.clone(),
app_version: stats_report.static_information.app_version.clone(),
user_agent: user_agent.to_string(),
start_day_utc: stats_report.session_report.start_day_utc,
connection_time_ms: stats_report.session_report.connection_time_ms,
tunnel_type: stats_report.session_report.tunnel_type.clone(),
retry_attempt: stats_report.session_report.retry_attempt,
session_duration_min: stats_report.session_report.session_duration_min,
disconnection_time_ms: stats_report.session_report.disconnection_time_ms,
exit_id: stats_report.session_report.exit_id.clone(),
follow_up_id: stats_report.session_report.follow_up_id.clone(),
error: stats_report.session_report.error.clone(),
}
}
}