Compare commits

...

9 Commits

Author SHA1 Message Date
Simon Wicky 4a2d9f8203 add forgetme option 2025-01-28 11:52:01 +01:00
Simon Wicky ee51cd4b67 changing debug level, because debug is too verbose 2025-01-28 11:32:33 +01:00
Simon Wicky eb1a4c176c explicit disabling of stats reporting for the stats collector 2025-01-28 11:32:33 +01:00
Simon Wicky c31f449afd Add support for custom report database path in statistics collector 2025-01-28 11:32:33 +01:00
Simon Wicky 3a770a5af9 graceful shutdown for stats collector 2025-01-28 11:32:33 +01:00
Simon Wicky c378c033cb improve logging 2025-01-28 11:32:33 +01:00
Simon Wicky eb78ac4087 Enhance mixnet client configuration and add report database path method 2025-01-28 11:32:32 +01:00
Simon Wicky 932c2cbe43 add storage to stats collector 2025-01-28 11:32:32 +01:00
Simon Wicky 73a036a3b9 Add statistics collector service provider with initial implementation 2025-01-28 11:31:17 +01:00
38 changed files with 1643 additions and 29 deletions
Generated
+29
View File
@@ -6817,6 +6817,35 @@ dependencies = [
"thiserror 1.0.69",
]
[[package]]
name = "nym-statistics-collector"
version = "0.1.0"
dependencies = [
"anyhow",
"bs58",
"clap",
"futures",
"log",
"nym-bin-common",
"nym-client-core",
"nym-config",
"nym-crypto",
"nym-id",
"nym-network-defaults",
"nym-sdk",
"nym-service-providers-common",
"nym-sphinx",
"nym-statistics-common",
"nym-task",
"nym-types",
"serde",
"serde_json",
"sqlx",
"thiserror 1.0.69",
"tokio",
"url",
]
[[package]]
name = "nym-statistics-common"
version = "0.1.0"
+19 -6
View File
@@ -129,6 +129,23 @@ members = [
"service-providers/common",
"service-providers/ip-packet-router",
"service-providers/network-requester",
"service-providers/statistics-collector",
"nym-api",
"nym-api/nym-api-requests",
"nym-browser-extension/storage",
"nym-credential-proxy/nym-credential-proxy",
"nym-credential-proxy/nym-credential-proxy-requests",
"nym-credential-proxy/vpn-api-lib-wasm",
"nym-network-monitor",
"nyx-chain-watcher",
"nym-node",
"nym-node/nym-node-requests",
"nym-node/nym-node-metrics",
"nym-node-status-api/nym-node-status-agent",
"nym-node-status-api/nym-node-status-api",
"nym-node-status-api/nym-node-status-client",
"nym-outfox",
"nym-validator-rewarder",
"tools/echo-server",
"tools/echo-server",
"tools/internal/contract-state-importer/importer-cli",
@@ -166,15 +183,11 @@ default-members = [
"service-providers/authenticator",
"service-providers/ip-packet-router",
"service-providers/network-requester",
"service-providers/statistics-collector",
"tools/nymvisor",
]
exclude = [
"explorer",
"contracts",
"nym-wallet",
"cpu-cycles",
]
exclude = ["explorer", "contracts", "nym-wallet", "cpu-cycles"]
[workspace.package]
authors = ["Nym Technologies SA"]
@@ -88,6 +88,7 @@ impl StatisticsControl {
log::error!("Failed to report client stats: {:?}", err);
} else {
self.stats.reset();
log::debug!("Stats report successfully sent");
}
}
+7 -7
View File
@@ -7,16 +7,16 @@ use nym_credentials_interface::TicketType;
use serde::{Deserialize, Serialize};
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub(crate) struct ConnectionStats {
pub struct ConnectionStats {
//tickets
mixnet_entry_spent: u32,
vpn_entry_spent: u32,
mixnet_exit_spent: u32,
vpn_exit_spent: u32,
pub mixnet_entry_spent: u32,
pub vpn_entry_spent: u32,
pub mixnet_exit_spent: u32,
pub vpn_exit_spent: u32,
//country_connection
wg_exit_country_code: String,
mix_exit_country_code: String,
pub wg_exit_country_code: String,
pub mix_exit_country_code: String,
}
/// Event space for Nym API statistics tracking
@@ -12,7 +12,7 @@ use nym_metrics::{inc, inc_by};
use serde::{Deserialize, Serialize};
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub(crate) struct GatewayStats {
pub struct GatewayStats {
// Sent
real_packets_sent: u64,
real_packets_sent_size: usize,
@@ -12,7 +12,7 @@ use nym_metrics::{inc, inc_by};
use serde::{Deserialize, Serialize};
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub(crate) struct NymApiStats {
pub struct NymApiStats {
// Sent
real_packets_sent: u64,
real_packets_sent_size: usize,
@@ -61,7 +61,7 @@ impl Instant {
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub(crate) struct PacketStatistics {
pub struct PacketStatistics {
// Sent
real_packets_sent: u64,
real_packets_sent_size: usize,
+13 -13
View File
@@ -18,16 +18,16 @@ const VERSION: &str = "v1";
/// Report object containing both data to be reported and client / device context. We take extra care not to overcapture context information.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ClientStatsReport {
pub(crate) kind: String,
pub(crate) api_version: String,
pub(crate) last_update_time: OffsetDateTime,
pub(crate) client_id: String,
pub(crate) client_type: String,
pub(crate) os_information: OsInformation,
pub(crate) packet_stats: PacketStatistics,
pub(crate) gateway_conn_stats: GatewayStats,
pub(crate) nym_api_stats: NymApiStats,
pub(crate) connection_stats: ConnectionStats,
pub kind: String,
pub api_version: String,
pub last_update_time: OffsetDateTime,
pub client_id: String,
pub client_type: String,
pub os_information: OsInformation,
pub packet_stats: PacketStatistics,
pub gateway_conn_stats: GatewayStats,
pub nym_api_stats: NymApiStats,
pub connection_stats: ConnectionStats,
}
impl From<ClientStatsReport> for Vec<u8> {
@@ -65,9 +65,9 @@ impl Default for ClientStatsReport {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OsInformation {
pub(crate) os_type: String,
pub(crate) os_version: Option<String>,
pub(crate) os_arch: String,
pub os_type: String,
pub os_version: Option<String>,
pub os_arch: String,
}
impl OsInformation {
@@ -0,0 +1,53 @@
[package]
name = "nym-statistics-collector"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
anyhow = { workspace = true }
bs58 = { workspace = true }
clap = { workspace = true, features = ["cargo", "derive"] }
futures = { workspace = true }
log = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
"sqlite",
"time",
] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "net"] }
url = { workspace = true }
nym-bin-common = { path = "../../common/bin-common", features = [
"clap",
"output_format",
] }
nym-client-core = { path = "../../common/client-core", features = ["cli"] }
nym-config = { path = "../../common/config" }
nym-crypto = { path = "../../common/crypto" }
nym-id = { path = "../../common/nym-id" }
nym-network-defaults = { path = "../../common/network-defaults" }
nym-sdk = { path = "../../sdk/rust/nym-sdk" }
nym-service-providers-common = { path = "../common" }
nym-statistics-common = { path = "../../common/statistics" }
nym-sphinx = { path = "../../common/nymsphinx" }
nym-task = { path = "../../common/task" }
nym-types = { path = "../../common/types" }
[build-dependencies]
tokio = { workspace = true, features = ["macros"] }
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
"sqlite",
"macros",
"migrate",
] }
@@ -0,0 +1,28 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
#[tokio::main]
async fn main() {
use sqlx::{Connection, SqliteConnection};
use std::env;
let out_dir = env::var("OUT_DIR").unwrap();
let database_path = format!("{out_dir}/stats-collector-example.sqlite");
let mut conn = SqliteConnection::connect(&format!("sqlite://{database_path}?mode=rwc"))
.await
.expect("Failed to create SQLx database connection");
sqlx::migrate!("./migrations")
.run(&mut conn)
.await
.expect("Failed to perform SQLx migrations");
#[cfg(target_family = "unix")]
println!("cargo:rustc-env=DATABASE_URL=sqlite://{}", &database_path);
#[cfg(target_family = "windows")]
// for some strange reason we need to add a leading `/` to the windows path even though it's
// not a valid windows path... but hey, it works...
println!("cargo:rustc-env=DATABASE_URL=sqlite:///{}", &database_path);
}
@@ -0,0 +1,23 @@
CREATE TABLE report (
day DATE NOT NULL,
client_id TEXT NOT NULL,
client_type TEXT,
os_type TEXT,
os_version TEXT,
architecture TEXT,
PRIMARY KEY (client_id, day)
);
CREATE TABLE connection_stats (
received_at TIMESTAMP WITH TIME ZONE NOT NULL,
client_id TEXT NOT NULL,
mixnet_entry_spent INTEGER,
vpn_entry_spent INTEGER,
mixnet_exit_spent INTEGER,
vpn_exit_spent INTEGER,
wg_exit_country_code TEXT,
mix_exit_country_code TEXT,
PRIMARY KEY (client_id, received_at)
);
@@ -0,0 +1,30 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::CliStatsCollectorClient;
use nym_bin_common::output_format::OutputFormat;
use nym_client_core::cli_helpers::client_add_gateway::{add_gateway, CommonClientAddGatewayArgs};
use nym_statistics_collector::error::StatsCollectorError;
#[derive(clap::Args)]
pub(crate) struct Args {
#[command(flatten)]
common_args: CommonClientAddGatewayArgs,
#[arg(short, long, default_value_t = OutputFormat::default())]
output: OutputFormat,
}
impl AsRef<CommonClientAddGatewayArgs> for Args {
fn as_ref(&self) -> &CommonClientAddGatewayArgs {
&self.common_args
}
}
pub(crate) async fn execute(args: Args) -> Result<(), StatsCollectorError> {
let output = args.output;
let res = add_gateway::<CliStatsCollectorClient, _>(args, None).await?;
println!("{}", output.format(&res));
Ok(())
}
@@ -0,0 +1,16 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use clap::Args;
use nym_bin_common::bin_info_owned;
use nym_bin_common::output_format::OutputFormat;
#[derive(Args)]
pub(crate) struct BuildInfo {
#[arg(short, long, default_value_t = OutputFormat::default())]
output: OutputFormat,
}
pub(crate) fn execute(args: BuildInfo) {
println!("{}", args.output.format(&bin_info_owned!()))
}
@@ -0,0 +1,16 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::CliStatsCollectorClient;
use nym_statistics_collector::error::StatsCollectorError;
use nym_client_core::cli_helpers::client_import_coin_index_signatures::{
import_coin_index_signatures, CommonClientImportCoinIndexSignaturesArgs,
};
pub(crate) async fn execute(
args: CommonClientImportCoinIndexSignaturesArgs,
) -> Result<(), StatsCollectorError> {
import_coin_index_signatures::<CliStatsCollectorClient, _>(args).await?;
println!("successfully imported coin index signatures!");
Ok(())
}
@@ -0,0 +1,14 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::CliStatsCollectorClient;
use nym_statistics_collector::error::StatsCollectorError;
use nym_client_core::cli_helpers::client_import_credential::{
import_credential, CommonClientImportTicketBookArgs,
};
pub async fn execute(args: CommonClientImportTicketBookArgs) -> Result<(), StatsCollectorError> {
import_credential::<CliStatsCollectorClient, _>(args).await?;
println!("successfully imported credential!");
Ok(())
}
@@ -0,0 +1,16 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::CliStatsCollectorClient;
use nym_statistics_collector::error::StatsCollectorError;
use nym_client_core::cli_helpers::client_import_expiration_date_signatures::{
import_expiration_date_signatures, CommonClientImportExpirationDateSignaturesArgs,
};
pub(crate) async fn execute(
args: CommonClientImportExpirationDateSignaturesArgs,
) -> Result<(), StatsCollectorError> {
import_expiration_date_signatures::<CliStatsCollectorClient, _>(args).await?;
println!("successfully imported expiration date signatures!");
Ok(())
}
@@ -0,0 +1,16 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::CliStatsCollectorClient;
use nym_statistics_collector::error::StatsCollectorError;
use nym_client_core::cli_helpers::client_import_master_verification_key::{
import_master_verification_key, CommonClientImportMasterVerificationKeyArgs,
};
pub(crate) async fn execute(
args: CommonClientImportMasterVerificationKeyArgs,
) -> Result<(), StatsCollectorError> {
import_master_verification_key::<CliStatsCollectorClient, _>(args).await?;
println!("successfully imported master verification key!");
Ok(())
}
@@ -0,0 +1,59 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use clap::{Args, Subcommand};
use nym_statistics_collector::error::StatsCollectorError;
use nym_client_core::cli_helpers::client_import_coin_index_signatures::CommonClientImportCoinIndexSignaturesArgs;
use nym_client_core::cli_helpers::client_import_credential::CommonClientImportTicketBookArgs;
use nym_client_core::cli_helpers::client_import_expiration_date_signatures::CommonClientImportExpirationDateSignaturesArgs;
use nym_client_core::cli_helpers::client_import_master_verification_key::CommonClientImportMasterVerificationKeyArgs;
pub(crate) mod import_coin_index_signatures;
pub(crate) mod import_credential;
pub(crate) mod import_expiration_date_signatures;
pub(crate) mod import_master_verification_key;
pub(crate) mod show_ticketbooks;
#[derive(Args)]
#[clap(args_conflicts_with_subcommands = true, subcommand_required = true)]
pub struct Ecash {
#[clap(subcommand)]
pub command: EcashCommands,
}
impl Ecash {
pub async fn execute(self) -> Result<(), StatsCollectorError> {
match self.command {
EcashCommands::ShowTicketBooks(args) => show_ticketbooks::execute(args).await?,
EcashCommands::ImportTicketBook(args) => import_credential::execute(args).await?,
EcashCommands::ImportCoinIndexSignatures(args) => {
import_coin_index_signatures::execute(args).await?
}
EcashCommands::ImportExpirationDateSignatures(args) => {
import_expiration_date_signatures::execute(args).await?
}
EcashCommands::ImportMasterVerificationKey(args) => {
import_master_verification_key::execute(args).await?
}
}
Ok(())
}
}
#[derive(Subcommand)]
pub enum EcashCommands {
/// Display information associated with the imported ticketbooks,
ShowTicketBooks(show_ticketbooks::Args),
/// Import a pre-generated ticketbook
ImportTicketBook(CommonClientImportTicketBookArgs),
/// Import coin index signatures needed for ticketbooks
ImportCoinIndexSignatures(CommonClientImportCoinIndexSignaturesArgs),
/// Import expiration date signatures needed for ticketbooks
ImportExpirationDateSignatures(CommonClientImportExpirationDateSignaturesArgs),
/// Import master verification key needed for ticketbooks
ImportMasterVerificationKey(CommonClientImportMasterVerificationKeyArgs),
}
@@ -0,0 +1,32 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::CliStatsCollectorClient;
use nym_statistics_collector::error::StatsCollectorError;
use nym_bin_common::output_format::OutputFormat;
use nym_client_core::cli_helpers::client_show_ticketbooks::{
show_ticketbooks, CommonShowTicketbooksArgs,
};
#[derive(clap::Args)]
pub(crate) struct Args {
#[command(flatten)]
common_args: CommonShowTicketbooksArgs,
#[arg(short, long, default_value_t = OutputFormat::default())]
output: OutputFormat,
}
impl AsRef<CommonShowTicketbooksArgs> for Args {
fn as_ref(&self) -> &CommonShowTicketbooksArgs {
&self.common_args
}
}
pub(crate) async fn execute(args: Args) -> Result<(), StatsCollectorError> {
let output = args.output;
let res = show_ticketbooks::<CliStatsCollectorClient, _>(args).await?;
println!("{}", output.format(&res));
Ok(())
}
@@ -0,0 +1,105 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::{override_config, CliStatsCollectorClient, OverrideConfig};
use clap::Args;
use nym_bin_common::output_format::OutputFormat;
use nym_client_core::cli_helpers::client_init::{
initialise_client, CommonClientInitArgs, InitResultsWithConfig, InitialisableClient,
};
use nym_statistics_collector::{
config::{default_config_directory, default_config_filepath, default_data_directory, Config},
error::StatsCollectorError,
};
use serde::Serialize;
use std::{fmt::Display, fs, path::PathBuf};
impl InitialisableClient for CliStatsCollectorClient {
type InitArgs = Init;
fn initialise_storage_paths(id: &str) -> Result<(), Self::Error> {
fs::create_dir_all(default_data_directory(id))?;
fs::create_dir_all(default_config_directory(id))?;
Ok(())
}
fn default_config_path(id: &str) -> PathBuf {
default_config_filepath(id)
}
fn construct_config(init_args: &Self::InitArgs) -> Self::Config {
override_config(
Config::new(&init_args.common_args.id),
OverrideConfig::from(init_args.clone()),
)
}
}
#[derive(Args, Clone, Debug)]
pub(crate) struct Init {
#[command(flatten)]
common_args: CommonClientInitArgs,
#[clap(short, long, default_value_t = OutputFormat::default())]
output: OutputFormat,
/// Custom path to the report storage database
#[clap(long)]
report_database_path: Option<PathBuf>,
}
impl From<Init> for OverrideConfig {
fn from(init_config: Init) -> Self {
OverrideConfig {
nym_apis: init_config.common_args.nym_apis,
nyxd_urls: init_config.common_args.nyxd_urls,
enabled_credentials_mode: init_config.common_args.enabled_credentials_mode,
report_database_path: init_config.report_database_path,
}
}
}
impl AsRef<CommonClientInitArgs> for Init {
fn as_ref(&self) -> &CommonClientInitArgs {
&self.common_args
}
}
#[derive(Debug, Serialize)]
pub struct InitResults {
#[serde(flatten)]
client_core: nym_client_core::init::types::InitResults,
client_address: String,
}
impl InitResults {
fn new(res: InitResultsWithConfig<Config>) -> Self {
Self {
client_address: res.init_results.address.to_string(),
client_core: res.init_results,
}
}
}
impl Display for InitResults {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "{}", self.client_core)?;
write!(
f,
"Address of this statistics collector: {}",
self.client_address
)
}
}
pub(crate) async fn execute(args: Init) -> Result<(), StatsCollectorError> {
eprintln!("Initialising client...");
let output = args.output;
let res = initialise_client::<CliStatsCollectorClient>(args, None).await?;
let init_results = InitResults::new(res);
println!("{}", output.format(&init_results));
Ok(())
}
@@ -0,0 +1,32 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::CliStatsCollectorClient;
use nym_bin_common::output_format::OutputFormat;
use nym_client_core::cli_helpers::client_list_gateways::{
list_gateways, CommonClientListGatewaysArgs,
};
use nym_statistics_collector::error::StatsCollectorError;
#[derive(clap::Args)]
pub(crate) struct Args {
#[command(flatten)]
common_args: CommonClientListGatewaysArgs,
#[arg(short, long, default_value_t = OutputFormat::default())]
output: OutputFormat,
}
impl AsRef<CommonClientListGatewaysArgs> for Args {
fn as_ref(&self) -> &CommonClientListGatewaysArgs {
&self.common_args
}
}
pub(crate) async fn execute(args: Args) -> Result<(), StatsCollectorError> {
let output = args.output;
let res = list_gateways::<CliStatsCollectorClient, _>(args).await?;
println!("{}", output.format(&res));
Ok(())
}
@@ -0,0 +1,171 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::ecash::Ecash;
use clap::{CommandFactory, Parser, Subcommand};
use log::error;
use nym_bin_common::bin_info;
use nym_bin_common::completions::{fig_generate, ArgShell};
use nym_client_core::cli_helpers::CliClient;
use nym_config::OptionalSet;
use nym_statistics_collector::{
config::{helpers::try_upgrade_config, BaseClientConfig, Config},
error::StatsCollectorError,
};
use std::path::PathBuf;
use std::sync::OnceLock;
mod add_gateway;
mod build_info;
pub mod ecash;
mod init;
mod list_gateways;
mod run;
mod sign;
mod switch_gateway;
pub(crate) struct CliStatsCollectorClient;
impl CliClient for CliStatsCollectorClient {
const NAME: &'static str = "statistics-collector";
type Error = StatsCollectorError;
type Config = Config;
async fn try_upgrade_outdated_config(id: &str) -> Result<(), Self::Error> {
try_upgrade_config(id).await
}
async fn try_load_current_config(id: &str) -> Result<Self::Config, Self::Error> {
try_load_current_config(id).await
}
}
fn pretty_build_info_static() -> &'static str {
static PRETTY_BUILD_INFORMATION: OnceLock<String> = OnceLock::new();
PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print())
}
#[derive(Parser)]
#[command(author = "Nymtech", version, about, long_version = pretty_build_info_static())]
pub(crate) struct Cli {
/// Path pointing to an env file that configures the client.
#[arg(short, long)]
pub(crate) config_env_file: Option<std::path::PathBuf>,
/// Flag used for disabling the printed banner in tty.
#[arg(long)]
pub(crate) no_banner: bool,
#[command(subcommand)]
command: Commands,
}
#[allow(clippy::large_enum_variant)]
#[derive(Subcommand)]
pub(crate) enum Commands {
/// Initialize a stats collector. Do this first!
Init(init::Init),
/// Run the stats collector with the provided configuration and optionally override
/// parameters.
Run(run::Run),
/// Ecash-related functionalities
Ecash(Ecash),
/// List all registered with gateways
ListGateways(list_gateways::Args),
/// Add new gateway to this client
AddGateway(add_gateway::Args),
/// Change the currently active gateway. Note that you must have already registered with the new gateway!
SwitchGateway(switch_gateway::Args),
/// Sign to prove ownership of this stats collector
Sign(sign::Sign),
/// Show build information of this binary
BuildInfo(build_info::BuildInfo),
/// Generate shell completions
Completions(ArgShell),
/// Generate Fig specification
GenerateFigSpec,
}
// Configuration that can be overridden.
pub(crate) struct OverrideConfig {
nym_apis: Option<Vec<url::Url>>,
nyxd_urls: Option<Vec<url::Url>>,
enabled_credentials_mode: Option<bool>,
report_database_path: Option<PathBuf>,
}
pub(crate) fn override_config(config: Config, args: OverrideConfig) -> Config {
config
.with_optional_base_custom_env(
BaseClientConfig::with_custom_nym_apis,
args.nym_apis,
nym_network_defaults::var_names::NYM_API,
nym_config::parse_urls,
)
.with_optional_base_custom_env(
BaseClientConfig::with_custom_nyxd,
args.nyxd_urls,
nym_network_defaults::var_names::NYXD,
nym_config::parse_urls,
)
.with_optional_base(
BaseClientConfig::with_disabled_credentials,
args.enabled_credentials_mode.map(|b| !b),
)
.with_optional(Config::with_report_database_path, args.report_database_path)
}
pub(crate) async fn execute(args: Cli) -> Result<(), StatsCollectorError> {
let bin_name = "nym-statistics-collector";
match args.command {
Commands::Init(m) => init::execute(m).await?,
Commands::Run(m) => run::execute(&m).await?,
Commands::Ecash(ecash) => ecash.execute().await?,
Commands::ListGateways(args) => list_gateways::execute(args).await?,
Commands::AddGateway(args) => add_gateway::execute(args).await?,
Commands::SwitchGateway(args) => switch_gateway::execute(args).await?,
Commands::Sign(m) => sign::execute(&m).await?,
Commands::BuildInfo(m) => build_info::execute(m),
Commands::Completions(s) => s.generate(&mut Cli::command(), bin_name),
Commands::GenerateFigSpec => fig_generate(&mut Cli::command(), bin_name),
}
Ok(())
}
async fn try_load_current_config(id: &str) -> Result<Config, StatsCollectorError> {
// try to load the config as is
if let Ok(cfg) = Config::read_from_default_path(id) {
return if !cfg.validate() {
Err(StatsCollectorError::ConfigValidationFailure)
} else {
Ok(cfg)
};
}
// we couldn't load it - try upgrading it from older revisions
try_upgrade_config(id).await?;
let config = match Config::read_from_default_path(id) {
Ok(cfg) => cfg,
Err(err) => {
error!("Failed to load config for {id}. Are you sure you have run `init` before? (Error was: {err})");
return Err(StatsCollectorError::FailedToLoadConfig(id.to_string()));
}
};
if !config.validate() {
return Err(StatsCollectorError::ConfigValidationFailure);
}
Ok(config)
}
@@ -0,0 +1,41 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::try_load_current_config;
use crate::cli::{override_config, OverrideConfig};
use clap::Args;
use nym_client_core::cli_helpers::client_run::CommonClientRunArgs;
use nym_statistics_collector::error::StatsCollectorError;
#[allow(clippy::struct_excessive_bools)]
#[derive(Args, Clone)]
pub(crate) struct Run {
#[command(flatten)]
common_args: CommonClientRunArgs,
}
impl From<Run> for OverrideConfig {
fn from(run_config: Run) -> Self {
OverrideConfig {
nym_apis: None,
nyxd_urls: run_config.common_args.nyxd_urls,
enabled_credentials_mode: run_config.common_args.enabled_credentials_mode,
report_database_path: None,
}
}
}
pub(crate) async fn execute(args: &Run) -> Result<(), StatsCollectorError> {
let mut config = try_load_current_config(&args.common_args.id).await?;
config = override_config(config, OverrideConfig::from(args.clone()));
log::debug!("Using config: {:#?}", config);
log::info!("Starting statistics collector service provider");
let mut server = nym_statistics_collector::stats_collector::StatisticsCollector::new(config);
if let Some(custom_mixnet) = &args.common_args.custom_mixnet {
server = server.with_stored_topology(custom_mixnet)?
}
server.run_service_provider().await
}
@@ -0,0 +1,74 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::try_load_current_config;
use clap::Args;
use nym_statistics_collector::error::StatsCollectorError;
use nym_bin_common::output_format::OutputFormat;
use nym_client_core::client::key_manager::persistence::OnDiskKeys;
use nym_client_core::error::ClientCoreError;
use nym_crypto::asymmetric::identity;
use nym_types::helpers::ConsoleSigningOutput;
#[derive(Args, Clone)]
pub(crate) struct Sign {
/// The id of the mixnode you want to sign with
#[arg(long)]
id: String,
/// Signs a transaction-specific payload, that is going to be sent to the smart contract, with your identity key
#[arg(long)]
contract_msg: String,
#[arg(short, long, default_value_t = OutputFormat::default())]
output: OutputFormat,
}
fn print_signed_contract_msg(
private_key: &identity::PrivateKey,
raw_msg: &str,
output: OutputFormat,
) {
let trimmed = raw_msg.trim();
eprintln!(">>> attempting to sign {trimmed}");
let Ok(decoded) = bs58::decode(trimmed).into_vec() else {
println!("it seems you have incorrectly copied the message to sign. Make sure you didn't accidentally skip any characters");
return;
};
eprintln!(">>> decoding the message...");
// we don't really care about what particular information is embedded inside of it,
// we just want to know if user correctly copied the string, i.e. whether it's a valid bs58 encoded json
if serde_json::from_slice::<serde_json::Value>(&decoded).is_err() {
println!("it seems you have incorrectly copied the message to sign. Make sure you didn't accidentally skip any characters");
return;
};
// if this is a valid json, it MUST be a valid string
let decoded_string = String::from_utf8(decoded.clone()).unwrap();
let signature = private_key.sign(&decoded).to_base58_string();
let sign_output = ConsoleSigningOutput::new(decoded_string, signature);
println!("{}", output.format(&sign_output));
}
pub(crate) async fn execute(args: &Sign) -> Result<(), StatsCollectorError> {
let config = try_load_current_config(&args.id).await?;
let key_store = OnDiskKeys::new(config.storage_paths.common_paths.keys);
let identity_keypair = key_store.load_identity_keypair().map_err(|source| {
StatsCollectorError::ClientCoreError(ClientCoreError::KeyStoreError {
source: Box::new(source),
})
})?;
print_signed_contract_msg(
identity_keypair.private_key(),
&args.contract_msg,
args.output,
);
Ok(())
}
@@ -0,0 +1,24 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::CliStatsCollectorClient;
use nym_client_core::cli_helpers::client_switch_gateway::{
switch_gateway, CommonClientSwitchGatewaysArgs,
};
use nym_statistics_collector::error::StatsCollectorError;
#[derive(clap::Args, Clone, Debug)]
pub struct Args {
#[command(flatten)]
common_args: CommonClientSwitchGatewaysArgs,
}
impl AsRef<CommonClientSwitchGatewaysArgs> for Args {
fn as_ref(&self) -> &CommonClientSwitchGatewaysArgs {
&self.common_args
}
}
pub(crate) async fn execute(args: Args) -> Result<(), StatsCollectorError> {
switch_gateway::<CliStatsCollectorClient, _>(args).await
}
@@ -0,0 +1,15 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use log::trace;
use std::path::Path;
use crate::error::StatsCollectorError;
pub async fn try_upgrade_config<P: AsRef<Path>>(
_config_path: P,
) -> Result<(), StatsCollectorError> {
trace!("Attempting to upgrade config");
Ok(())
}
@@ -0,0 +1,186 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_bin_common::logging::LoggingSettings;
pub use nym_client_core::config::Config as BaseClientConfig;
use nym_client_core::{cli_helpers::CliClientConfig, config::disk_persistence::CommonClientPaths};
use nym_config::{
must_get_home, save_formatted_config_to_file, NymConfigTemplate, OptionalSet,
DEFAULT_CONFIG_DIR, DEFAULT_CONFIG_FILENAME, DEFAULT_DATA_DIR, NYM_DIR,
};
use nym_service_providers_common::DEFAULT_SERVICE_PROVIDERS_DIR;
pub use persistence::StatsCollectorPaths;
use serde::{Deserialize, Serialize};
use std::{
io,
path::{Path, PathBuf},
str::FromStr,
};
use template::CONFIG_TEMPLATE;
pub mod helpers;
pub mod persistence;
pub mod template;
const DEFAULT_STATISTICS_COLLECTOR_DIR: &str = "statistics_collector";
/// Derive default path to stats collector's config directory.
/// It should get resolved to `$HOME/.nym/service-providers/statistics_collector/<id>/config`
pub fn default_config_directory<P: AsRef<Path>>(id: P) -> PathBuf {
must_get_home()
.join(NYM_DIR)
.join(DEFAULT_SERVICE_PROVIDERS_DIR)
.join(DEFAULT_STATISTICS_COLLECTOR_DIR)
.join(id)
.join(DEFAULT_CONFIG_DIR)
}
/// Derive default path to stats collector's config file.
/// It should get resolved to `$HOME/.nym/service-providers/statistics_collector/<id>/config/config.toml`
pub fn default_config_filepath<P: AsRef<Path>>(id: P) -> PathBuf {
default_config_directory(id).join(DEFAULT_CONFIG_FILENAME)
}
/// Derive default path to stats collector's data directory where files, such as keys, are stored.
/// It should get resolved to `$HOME/.nym/service-providers/statistics_collector/<id>/data`
pub fn default_data_directory<P: AsRef<Path>>(id: P) -> PathBuf {
must_get_home()
.join(NYM_DIR)
.join(DEFAULT_SERVICE_PROVIDERS_DIR)
.join(DEFAULT_STATISTICS_COLLECTOR_DIR)
.join(id)
.join(DEFAULT_DATA_DIR)
}
#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)]
#[serde(deny_unknown_fields)]
pub struct Config {
#[serde(flatten)]
pub base: BaseClientConfig,
pub storage_paths: StatsCollectorPaths,
pub logging: LoggingSettings,
}
impl NymConfigTemplate for Config {
fn template(&self) -> &'static str {
CONFIG_TEMPLATE
}
}
impl CliClientConfig for Config {
fn common_paths(&self) -> &CommonClientPaths {
&self.storage_paths.common_paths
}
fn core_config(&self) -> &BaseClientConfig {
&self.base
}
fn default_store_location(&self) -> PathBuf {
self.default_location()
}
fn save_to<P: AsRef<Path>>(&self, path: P) -> io::Result<()> {
save_formatted_config_to_file(self, path)
}
}
impl Config {
pub fn new<S: AsRef<str>>(id: S) -> Self {
Config {
base: BaseClientConfig::new(id.as_ref(), env!("CARGO_PKG_VERSION")),
storage_paths: StatsCollectorPaths::new_base(default_data_directory(id.as_ref())),
logging: Default::default(),
}
}
#[allow(unused)]
pub fn with_data_directory<P: AsRef<Path>>(mut self, data_directory: P) -> Self {
self.storage_paths = StatsCollectorPaths::new_base(data_directory);
self
}
#[allow(unused)]
pub fn with_report_database_path<P: Into<std::path::PathBuf>>(
mut self,
database_path: P,
) -> Self {
self.storage_paths.client_reports_database = database_path.into();
self
}
pub fn read_from_toml_file<P: AsRef<Path>>(path: P) -> io::Result<Self> {
nym_config::read_config_from_toml_file(path)
}
pub fn read_from_default_path<P: AsRef<Path>>(id: P) -> io::Result<Self> {
Self::read_from_toml_file(default_config_filepath(id))
}
pub fn default_location(&self) -> PathBuf {
default_config_filepath(&self.base.client.id)
}
#[allow(unused)]
pub fn save_to_default_location(&self) -> io::Result<()> {
let config_save_location: PathBuf = self.default_location();
save_formatted_config_to_file(self, config_save_location)
}
pub fn validate(&self) -> bool {
// no other sections have explicit requirements (yet)
self.base.validate()
}
#[doc(hidden)]
pub fn set_no_poisson_process(&mut self) {
self.base.set_no_poisson_process()
}
// poor man's 'builder' method
#[allow(unused)]
pub fn with_base<F, T>(mut self, f: F, val: T) -> Self
where
F: Fn(BaseClientConfig, T) -> BaseClientConfig,
{
self.base = f(self.base, val);
self
}
// helper methods to use `OptionalSet` trait. Those are defined due to very... ehm. 'specific' structure of this config
// (plz, lets refactor it)
pub fn with_optional_base<F, T>(mut self, f: F, val: Option<T>) -> Self
where
F: Fn(BaseClientConfig, T) -> BaseClientConfig,
{
self.base = self.base.with_optional(f, val);
self
}
#[allow(unused)]
pub fn with_optional_base_env<F, T>(mut self, f: F, val: Option<T>, env_var: &str) -> Self
where
F: Fn(BaseClientConfig, T) -> BaseClientConfig,
T: FromStr,
<T as FromStr>::Err: std::fmt::Debug,
{
self.base = self.base.with_optional_env(f, val, env_var);
self
}
pub fn with_optional_base_custom_env<F, T, G>(
mut self,
f: F,
val: Option<T>,
env_var: &str,
parser: G,
) -> Self
where
F: Fn(BaseClientConfig, T) -> BaseClientConfig,
G: Fn(&str) -> T,
{
self.base = self.base.with_optional_custom_env(f, val, env_var, parser);
self
}
}
@@ -0,0 +1,26 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_client_core::config::disk_persistence::CommonClientPaths;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
pub const DEFAULT_REPORT_DATABASE_PATH: &str = "report.db";
#[derive(Debug, Deserialize, PartialEq, Eq, Serialize, Clone)]
pub struct StatsCollectorPaths {
#[serde(flatten)]
pub common_paths: CommonClientPaths,
pub client_reports_database: PathBuf,
}
impl StatsCollectorPaths {
pub fn new_base<P: AsRef<Path>>(base_data_directory: P) -> Self {
let base_dir = base_data_directory.as_ref();
Self {
common_paths: CommonClientPaths::new_base(base_dir),
client_reports_database: base_dir.join(DEFAULT_REPORT_DATABASE_PATH),
}
}
}
@@ -0,0 +1,95 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub(crate) const CONFIG_TEMPLATE: &str =
// While using normal toml marshalling would have been way simpler with less overhead,
// I think it's useful to have comments attached to the saved config file to explain behaviour of
// particular fields.
// Note: any changes to the template must be reflected in the appropriate structs.
r#"
# This is a TOML config file.
# For more information, see https://github.com/toml-lang/toml
##### main base client config options #####
[client]
# Version of the client for which this configuration was created.
version = '{{ client.version }}'
# Human readable ID of this particular client.
id = '{{ client.id }}'
# Indicates whether this client is running in a disabled credentials mode, thus attempting
# to claim bandwidth without presenting bandwidth credentials.
disabled_credentials_mode = {{ client.disabled_credentials_mode }}
# Addresses to nyxd validators via which the client can communicate with the chain.
nyxd_urls = [
{{#each client.nyxd_urls }}
'{{this}}',
{{/each}}
]
# Addresses to APIs running on validator from which the client gets the view of the network.
nym_api_urls = [
{{#each client.nym_api_urls }}
'{{this}}',
{{/each}}
]
[storage_paths]
# Path to file containing private identity key.
keys.private_identity_key_file = '{{ storage_paths.keys.private_identity_key_file }}'
# Path to file containing public identity key.
keys.public_identity_key_file = '{{ storage_paths.keys.public_identity_key_file }}'
# Path to file containing private encryption key.
keys.private_encryption_key_file = '{{ storage_paths.keys.private_encryption_key_file }}'
# Path to file containing public encryption key.
keys.public_encryption_key_file = '{{ storage_paths.keys.public_encryption_key_file }}'
# Path to file containing key used for encrypting and decrypting the content of an
# acknowledgement so that nobody besides the client knows which packet it refers to.
keys.ack_key_file = '{{ storage_paths.keys.ack_key_file }}'
# Path to the database containing bandwidth credentials
credentials_database = '{{ storage_paths.credentials_database }}'
# Path to the persistent store for received reply surbs, unused encryption keys and used sender tags.
reply_surb_database = '{{ storage_paths.reply_surb_database }}'
# Path to the file containing information about gateways used by this client,
# i.e. details such as their public keys, owner addresses or the network information.
gateway_registrations = '{{ storage_paths.gateway_registrations }}'
# Path to the client reports database
client_reports_database = '{{ storage_paths.client_reports_database }}'
##### logging configuration options #####
[logging]
# TODO
##### debug configuration options #####
# The following options should not be modified unless you know EXACTLY what you are doing
# as if set incorrectly, they may impact your anonymity.
[debug]
[debug.traffic]
average_packet_delay = '{{ debug.traffic.average_packet_delay }}'
message_sending_average_delay = '{{ debug.traffic.message_sending_average_delay }}'
[debug.acknowledgements]
average_ack_delay = '{{ debug.acknowledgements.average_ack_delay }}'
[debug.cover_traffic]
loop_cover_traffic_average_delay = '{{ debug.cover_traffic.loop_cover_traffic_average_delay }}'
"#;
@@ -0,0 +1,38 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_client_core::error::ClientCoreError;
use nym_id::NymIdError;
#[derive(thiserror::Error, Debug)]
pub enum StatsCollectorError {
#[error("client-core error: {0}")]
ClientCoreError(#[from] ClientCoreError),
// TODO: add more details here
#[error("failed to validate the loaded config")]
ConfigValidationFailure,
#[error("failed to connect to mixnet: {source}")]
FailedToConnectToMixnet { source: nym_sdk::Error },
#[error("failed to load configuration file: {0}")]
FailedToLoadConfig(String),
#[error("failed to setup mixnet client: {source}")]
FailedToSetupMixnetClient { source: nym_sdk::Error },
#[error("Stats error : {0}")]
StatsError(#[from] nym_statistics_common::error::StatsError),
#[error("I/O error: {0}")]
IoError(#[from] std::io::Error),
#[error(transparent)]
NymIdError(#[from] NymIdError),
#[error("Storage error : {0}")]
ReportStorageError(#[from] crate::storage::error::ClientStatsReportStorageError),
}
pub type Result<T> = std::result::Result<T, StatsCollectorError>;
@@ -0,0 +1,9 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod config;
pub mod error;
pub mod mixnet_client;
pub mod mixnet_listener;
pub mod stats_collector;
pub mod storage;
@@ -0,0 +1,20 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
mod cli;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
use clap::Parser;
let args = cli::Cli::parse();
nym_bin_common::logging::setup_logging();
nym_network_defaults::setup_env(args.config_env_file.as_ref());
if !args.no_banner {
nym_bin_common::logging::maybe_print_banner(clap::crate_name!(), clap::crate_version!());
}
cli::execute(args).await?;
Ok(())
}
@@ -0,0 +1,70 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_client_core::{
config::{disk_persistence::CommonClientPaths, StatsReporting},
ForgetMe, TopologyProvider,
};
use nym_sdk::{GatewayTransceiver, NymNetworkDetails};
use nym_task::TaskClient;
use crate::{config::BaseClientConfig, error::StatsCollectorError};
// Helper function to create the mixnet client.
// This is NOT in the SDK since we don't want to expose any of the client-core config types.
// We could however consider moving it to a crate in common in the future.
// TODO: refactor this function and its arguments
pub async fn create_mixnet_client(
config: &BaseClientConfig,
shutdown: TaskClient,
custom_transceiver: Option<Box<dyn GatewayTransceiver + Send + Sync>>,
custom_topology_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
wait_for_gateway: bool,
paths: &CommonClientPaths,
) -> Result<nym_sdk::mixnet::MixnetClient, StatsCollectorError> {
let mut debug_config = config.debug;
//SW do we need cover traffic if we're only gonna receive stuff?
debug_config
.traffic
.disable_main_poisson_packet_distribution = true;
debug_config.cover_traffic.disable_loop_cover_traffic_stream = true;
debug_config.stats_reporting.enabled = false;
debug_config.topology.ignore_egress_epoch_role = true; //necessary for a fixed address
let storage_paths = nym_sdk::mixnet::StoragePaths::from(paths.clone());
let mut client_builder =
nym_sdk::mixnet::MixnetClientBuilder::new_with_default_storage(storage_paths)
.await
.map_err(|err| StatsCollectorError::FailedToSetupMixnetClient { source: err })?
.network_details(NymNetworkDetails::new_from_env())
.debug_config(debug_config)
.custom_shutdown(shutdown)
.with_user_agent(nym_bin_common::bin_info!().into())
.with_wait_for_gateway(wait_for_gateway)
.with_statistics_reporting(StatsReporting {
enabled: false,
..Default::default()
})
.with_forget_me(ForgetMe::new_all());
if !config.get_disabled_credentials_mode() {
client_builder = client_builder.enable_credentials_mode();
}
if let Some(gateway_transceiver) = custom_transceiver {
client_builder = client_builder.custom_gateway_transceiver(gateway_transceiver);
}
if let Some(topology_provider) = custom_topology_provider {
client_builder = client_builder.custom_topology_provider(topology_provider);
}
let mixnet_client = client_builder
.build()
.map_err(|err| StatsCollectorError::FailedToSetupMixnetClient { source: err })?;
mixnet_client
.connect_to_mixnet()
.await
.map_err(|err| StatsCollectorError::FailedToConnectToMixnet { source: err })
}
@@ -0,0 +1,82 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::{error::*, storage::ClientStatsStorage};
use futures::StreamExt;
use nym_sdk::TaskClient;
use nym_sphinx::receiver::ReconstructedMessage;
use nym_statistics_common::report::ClientStatsReport;
pub(crate) struct MixnetListener {
// The mixnet client that we use to send and receive packets from the mixnet
pub(crate) mixnet_client: nym_sdk::mixnet::MixnetClient,
// Report storage
pub(crate) client_report_storage: ClientStatsStorage,
// The task client for the main loop
pub(crate) task_client: TaskClient,
}
impl MixnetListener {
pub fn new(
mixnet_client: nym_sdk::mixnet::MixnetClient,
client_report_storage: ClientStatsStorage,
task_client: TaskClient,
) -> Self {
MixnetListener {
mixnet_client,
client_report_storage,
task_client,
}
}
async fn on_reconstructed_message(
&mut self,
reconstructed: ReconstructedMessage,
) -> Result<String> {
log::debug!(
"Received message with sender_tag: {:?}",
reconstructed.sender_tag
);
let report = deserialize_stats_report(&reconstructed)?;
self.client_report_storage
.store_report(report.clone())
.await?;
Ok(report.client_id)
}
pub(crate) async fn run(mut self) -> Result<()> {
while !self.task_client.is_shutdown() {
tokio::select! {
_ = self.task_client.recv() => {
log::debug!("Statistics collector [main loop]: received shutdown");
},
msg = self.mixnet_client.next() => {
if let Some(msg) = msg {
match self.on_reconstructed_message(msg).await {
Ok(client_id) => {
log::info!("Successfully stored client reports from ID : {client_id}")
},
Err(err) => {
log::error!("Error handling reconstructed mixnet message: {err}");
}
};
} else {
log::trace!("Statistics collector [main loop]: stopping since channel closed");
break;
};
},
}
}
log::debug!("Statistics collector: stopping");
Ok(())
}
}
fn deserialize_stats_report(reconstructed: &ReconstructedMessage) -> Result<ClientStatsReport> {
let report_bytes: &[u8] = &reconstructed.message;
Ok(report_bytes.try_into()?)
}
@@ -0,0 +1,119 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::path::Path;
use nym_client_core::{HardcodedTopologyProvider, TopologyProvider};
use nym_sdk::GatewayTransceiver;
use nym_task::{TaskClient, TaskHandle};
use crate::{config::Config, error::StatsCollectorError, storage::ClientStatsStorage};
pub struct StatisticsCollector {
#[allow(unused)]
config: Config,
wait_for_gateway: bool,
custom_topology_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
custom_gateway_transceiver: Option<Box<dyn GatewayTransceiver + Send + Sync>>,
shutdown: Option<TaskClient>,
}
impl StatisticsCollector {
pub fn new(config: Config) -> Self {
Self {
config,
wait_for_gateway: false,
custom_topology_provider: None,
custom_gateway_transceiver: None,
shutdown: None,
}
}
#[must_use]
#[allow(unused)]
pub fn with_shutdown(mut self, shutdown: TaskClient) -> Self {
self.shutdown = Some(shutdown);
self
}
#[must_use]
#[allow(unused)]
pub fn with_report_database_path<P: Into<std::path::PathBuf>>(
mut self,
database_path: P,
) -> Self {
self.config.storage_paths.client_reports_database = database_path.into();
self
}
#[must_use]
#[allow(unused)]
pub fn with_wait_for_gateway(mut self, wait_for_gateway: bool) -> Self {
self.wait_for_gateway = wait_for_gateway;
self
}
#[must_use]
#[allow(unused)]
pub fn with_custom_gateway_transceiver(
mut self,
gateway_transceiver: Box<dyn GatewayTransceiver + Send + Sync>,
) -> Self {
self.custom_gateway_transceiver = Some(gateway_transceiver);
self
}
#[must_use]
#[allow(unused)]
pub fn with_custom_topology_provider(
mut self,
topology_provider: Box<dyn TopologyProvider + Send + Sync>,
) -> Self {
self.custom_topology_provider = Some(topology_provider);
self
}
pub fn with_stored_topology<P: AsRef<Path>>(
mut self,
file: P,
) -> Result<Self, StatsCollectorError> {
self.custom_topology_provider =
Some(Box::new(HardcodedTopologyProvider::new_from_file(file)?));
Ok(self)
}
pub async fn run_service_provider(self) -> Result<(), StatsCollectorError> {
// Used to notify tasks to shutdown. Not all tasks fully supports this (yet).
let task_handle: TaskHandle = self.shutdown.map(Into::into).unwrap_or_default();
// Connect to the mixnet
let mixnet_client = crate::mixnet_client::create_mixnet_client(
&self.config.base,
task_handle
.get_handle()
.named("nym_sdk::MixnetClient[STATS]"),
self.custom_gateway_transceiver,
self.custom_topology_provider,
self.wait_for_gateway,
&self.config.storage_paths.common_paths,
)
.await?;
let self_address = *mixnet_client.nym_address();
let report_storage =
ClientStatsStorage::init(self.config.storage_paths.client_reports_database).await?;
let mixnet_listener = crate::mixnet_listener::MixnetListener::new(
mixnet_client,
report_storage,
task_handle.fork("mixnet_listener"),
);
tokio::spawn(async move { mixnet_listener.run().await });
log::info!("The address of this client is: {self_address}");
log::info!("All systems go. Press CTRL-C to stop the server.");
let _ = task_handle.wait_for_shutdown().await;
Ok(())
}
}
@@ -0,0 +1,69 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_statistics_common::report::ClientStatsReport;
pub(crate) type Result<T> = std::result::Result<T, sqlx::Error>;
#[derive(Clone)]
pub(crate) struct ClientStatsReportManager {
connection_pool: sqlx::SqlitePool,
}
impl ClientStatsReportManager {
/// Creates new instance of the `ClientStatsReportManager` with the provided sqlite connection pool.
///
/// # Arguments
///
/// * `connection_pool`: database connection pool to use.
pub(crate) fn new(connection_pool: sqlx::SqlitePool) -> Self {
ClientStatsReportManager { connection_pool }
}
pub async fn store_report(&mut self, report: ClientStatsReport) -> Result<()> {
self.store_base(&report).await?;
self.store_connection_stats(&report).await?;
Ok(())
}
async fn store_base(&self, report: &ClientStatsReport) -> Result<()> {
let report_day = report.last_update_time.date();
sqlx::query!(
"INSERT OR IGNORE INTO report (day, client_id, client_type, os_type, os_version, architecture) VALUES (?, ?, ?, ?, ?, ?)",
report_day,
report.client_id,
report.client_type,
report.os_information.os_type,
report.os_information.os_version,
report.os_information.os_arch
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
async fn store_connection_stats(&self, report: &ClientStatsReport) -> Result<()> {
sqlx::query!(
"INSERT OR IGNORE INTO connection_stats (
received_at,
client_id,
mixnet_entry_spent,
vpn_entry_spent,
mixnet_exit_spent,
vpn_exit_spent,
wg_exit_country_code,
mix_exit_country_code) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
report.last_update_time,
report.client_id,
report.connection_stats.mixnet_entry_spent,
report.connection_stats.vpn_entry_spent,
report.connection_stats.mixnet_exit_spent,
report.connection_stats.vpn_exit_spent,
report.connection_stats.wg_exit_country_code,
report.connection_stats.mix_exit_country_code
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
}
@@ -0,0 +1,13 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use thiserror::Error;
#[derive(Error, Debug)]
pub enum ClientStatsReportStorageError {
#[error("Database experienced an internal error: {0}")]
InternalDatabaseError(#[from] sqlx::Error),
#[error("Failed to perform database migration: {0}")]
MigrationError(#[from] sqlx::migrate::MigrateError),
}
@@ -0,0 +1,79 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use error::ClientStatsReportStorageError;
use log::{debug, error};
use nym_statistics_common::report::ClientStatsReport;
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
pub mod error;
//pub mod models;
mod client_stats_report;
// note that clone here is fine as upon cloning the same underlying pool will be used
#[derive(Clone)]
pub struct ClientStatsStorage {
client_stats_report_manager: client_stats_report::ClientStatsReportManager,
}
impl ClientStatsStorage {
/// Initialises `ClientStatsStorage` using the provided path.
///
/// # Arguments
///
/// * `database_path`: path to the database.
pub async fn init<P: AsRef<Path> + Send>(
database_path: P,
) -> Result<Self, ClientStatsReportStorageError> {
debug!(
"Attempting to connect to database {:?}",
database_path.as_ref().as_os_str()
);
// TODO: we can inject here more stuff based on our gateway global config
// struct. Maybe different pool size or timeout intervals?
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
// TODO: do we want auto_vacuum ?
let connection_pool = match sqlx::SqlitePool::connect_with(opts).await {
Ok(db) => db,
Err(err) => {
error!("Failed to connect to SQLx database: {err}");
return Err(err.into());
}
};
if let Err(err) = sqlx::migrate!("./migrations").run(&connection_pool).await {
error!("Failed to perform migration on the SQLx database: {err}");
return Err(err.into());
}
// the cloning here are cheap as connection pool is stored behind an Arc
Ok(ClientStatsStorage {
client_stats_report_manager: client_stats_report::ClientStatsReportManager::new(
connection_pool,
),
})
}
pub(crate) async fn store_report(
&mut self,
report: ClientStatsReport,
) -> Result<(), ClientStatsReportStorageError> {
Ok(self
.client_stats_report_manager
.store_report(report)
.await?)
}
}