Compare commits

...

1 Commits

Author SHA1 Message Date
Bogdan-Ștefan Neacşu 407725f697 Introduce event backchannel (#6119)
* Introduce even backchannel

* Rust fmt

* Rename Event to MixnetClientEvent

* Use unbounded_send for events

* Remove unused file

* Remove mut borrow

* Event hierarchy and mixnet client intermediary

* Export MixTrafficEvent in sdk
2025-10-17 08:47:43 +03:00
12 changed files with 171 additions and 32 deletions
Generated
+1
View File
@@ -6672,6 +6672,7 @@ dependencies = [
name = "nym-registration-client"
version = "0.1.0"
dependencies = [
"futures",
"nym-authenticator-client",
"nym-bandwidth-controller",
"nym-credential-storage",
@@ -7,11 +7,12 @@ use super::statistics_control::StatisticsControl;
use crate::client::base_client::storage::helpers::store_client_keys;
use crate::client::base_client::storage::MixnetClientStorage;
use crate::client::cover_traffic_stream::LoopCoverTrafficStream;
use crate::client::event_control::EventControl;
use crate::client::inbound_messages::{InputMessage, InputMessageReceiver, InputMessageSender};
use crate::client::key_manager::persistence::KeyStore;
use crate::client::key_manager::ClientKeys;
use crate::client::mix_traffic::transceiver::{GatewayReceiver, GatewayTransceiver, RemoteGateway};
use crate::client::mix_traffic::{BatchMixMessageSender, MixTrafficController};
use crate::client::mix_traffic::{BatchMixMessageSender, MixTrafficController, MixTrafficEvent};
use crate::client::real_messages_control;
use crate::client::real_messages_control::RealMessagesController;
use crate::client::received_buffer::{
@@ -66,7 +67,6 @@ use std::path::Path;
use std::sync::Arc;
use time::OffsetDateTime;
use tokio::sync::mpsc::Sender;
use tracing::*;
use url::Url;
#[cfg(all(
@@ -79,6 +79,23 @@ pub mod non_wasm_helpers;
pub mod helpers;
pub mod storage;
#[derive(Clone, Copy, Debug)]
pub enum MixnetClientEvent {
Traffic(MixTrafficEvent),
}
pub type EventReceiver = mpsc::UnboundedReceiver<MixnetClientEvent>;
#[derive(Clone)]
pub struct EventSender(pub mpsc::UnboundedSender<MixnetClientEvent>);
impl EventSender {
pub fn send(&self, event: MixnetClientEvent) {
if let Err(err) = self.0.unbounded_send(event) {
tracing::warn!("Failed to send error event. The caller event reader was closed: {err}");
}
}
}
#[derive(Clone)]
pub struct ClientInput {
pub connection_command_sender: ConnectionCommandSender,
@@ -194,6 +211,7 @@ pub struct BaseClientBuilder<C, S: MixnetClientStorage> {
custom_topology_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
custom_gateway_transceiver: Option<Box<dyn GatewayTransceiver + Send>>,
shutdown: Option<ShutdownTracker>,
event_tx: Option<EventSender>,
user_agent: Option<UserAgent>,
setup_method: GatewaySetup,
@@ -222,6 +240,7 @@ where
custom_topology_provider: None,
custom_gateway_transceiver: None,
shutdown: None,
event_tx: None,
user_agent: None,
setup_method: GatewaySetup::MustLoad { gateway_id: None },
#[cfg(unix)]
@@ -284,6 +303,12 @@ where
self
}
#[must_use]
pub fn with_event_tx(mut self, event_tx: EventSender) -> Self {
self.event_tx = Some(event_tx);
self
}
#[must_use]
pub fn with_user_agent(mut self, user_agent: UserAgent) -> Self {
self.user_agent = Some(user_agent);
@@ -314,6 +339,18 @@ where
details.client_address()
}
fn start_event_control(
parent_event_tx: Option<EventSender>,
children_event_rx: EventReceiver,
shutdown_tracker: &ShutdownTracker,
) {
let event_control = EventControl::new(parent_event_tx, children_event_rx);
shutdown_tracker.try_spawn_named_with_shutdown(
async move { event_control.run().await },
"EventControl",
);
}
// future constantly pumping loop cover traffic at some specified average rate
// the pumped traffic goes to the MixTrafficController
fn start_cover_traffic_stream(
@@ -325,7 +362,7 @@ where
stats_tx: ClientStatsSender,
shutdown_tracker: &ShutdownTracker,
) {
info!("Starting loop cover traffic stream...");
tracing::info!("Starting loop cover traffic stream...");
let mut stream = LoopCoverTrafficStream::new(
ack_key,
@@ -357,7 +394,7 @@ where
stats_tx: ClientStatsSender,
shutdown_tracker: &ShutdownTracker,
) {
info!("Starting real traffic stream...");
tracing::info!("Starting real traffic stream...");
let real_messages_controller = RealMessagesController::new(
controller_config,
@@ -442,7 +479,7 @@ where
metrics_reporter: ClientStatsSender,
shutdown_tracker: &ShutdownTracker,
) {
info!("Starting received messages buffer controller...");
tracing::info!("Starting received messages buffer controller...");
let controller = ReceivedMessagesBufferController::<SphinxMessageReceiver>::new(
local_encryption_keypair,
query_receiver,
@@ -553,7 +590,7 @@ where
details_store
.upgrade_stored_remote_gateway_key(gateway_client.gateway_identity(), &updated_key)
.await.map_err(|err| {
error!("failed to store upgraded gateway key! this connection might be forever broken now: {err}");
tracing::error!("failed to store upgraded gateway key! this connection might be forever broken now: {err}");
ClientCoreError::GatewaysDetailsStoreError { source: Box::new(err) }
})?
}
@@ -650,7 +687,7 @@ where
if topology_config.disable_refreshing {
// if we're not spawning the refresher, don't cause shutdown immediately
info!("The background topology refresher is not going to be started");
tracing::info!("The background topology refresher is not going to be started");
}
let mut topology_refresher = TopologyRefresher::new(
@@ -660,7 +697,7 @@ where
);
// before returning, block entire runtime to refresh the current network view so that any
// components depending on topology would see a non-empty view
info!("Obtaining initial network topology");
tracing::info!("Obtaining initial network topology");
topology_refresher.try_refresh().await;
if let Err(err) = topology_refresher.ensure_topology_is_routable().await {
@@ -686,13 +723,13 @@ where
.wait_for_gateway(local_gateway, waiting_timeout)
.await
{
error!(
tracing::error!(
"the gateway did not come back online within the specified timeout: {err}"
);
return Err(err.into());
}
} else {
error!("the gateway we're supposedly connected to does not exist. We'll not be able to send any packets to ourselves: {err}");
tracing::error!("the gateway we're supposedly connected to does not exist. We'll not be able to send any packets to ourselves: {err}");
return Err(err.into());
}
}
@@ -700,7 +737,7 @@ where
if !topology_config.disable_refreshing {
// don't spawn the refresher if we don't want to be refreshing the topology.
// only use the initial values obtained
info!("Starting topology refresher...");
tracing::info!("Starting topology refresher...");
shutdown_tracker.try_spawn_named_with_shutdown(
async move { topology_refresher.run().await },
"TopologyRefresher",
@@ -717,7 +754,7 @@ where
input_sender: Sender<InputMessage>,
shutdown_tracker: &ShutdownTracker,
) -> ClientStatsSender {
info!("Starting statistics control...");
tracing::info!("Starting statistics control...");
StatisticsControl::create_and_start(
config.debug.stats_reporting,
user_agent
@@ -732,10 +769,14 @@ where
fn start_mix_traffic_controller(
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
shutdown_tracker: &ShutdownTracker,
event_tx: EventSender,
) -> (BatchMixMessageSender, ClientRequestSender) {
info!("Starting mix traffic controller...");
let (mut mix_traffic_controller, mix_tx, client_tx) =
MixTrafficController::new(gateway_transceiver, shutdown_tracker.clone_shutdown_token());
tracing::info!("Starting mix traffic controller...");
let (mut mix_traffic_controller, mix_tx, client_tx) = MixTrafficController::new(
gateway_transceiver,
shutdown_tracker.clone_shutdown_token(),
event_tx,
);
shutdown_tracker.try_spawn_named(
async move { mix_traffic_controller.run().await },
@@ -799,7 +840,7 @@ where
{
// if client keys do not exist already, create and persist them
if key_store.load_keys().await.is_err() {
info!("could not find valid client keys - a new set will be generated");
tracing::info!("could not find valid client keys - a new set will be generated");
let mut rng = OsRng;
let keys = if let Some(derivation_material) = derivation_material {
ClientKeys::from_master_key(&mut rng, &derivation_material)
@@ -846,7 +887,7 @@ where
<S::CredentialStore as CredentialStorage>::StorageError: Send + Sync + 'static,
<S::GatewaysDetailsStore as GatewaysDetailsStore>::StorageError: Sync + Send,
{
info!("Starting nym client");
tracing::info!("Starting nym client");
// derive (or load) client keys and gateway configuration
let init_res = Self::initialise_keys_and_gateway(
@@ -875,6 +916,9 @@ where
// channels responsible for controlling real messages
let (input_sender, input_receiver) = tokio::sync::mpsc::channel::<InputMessage>(1);
// channels responsible for event management
let (event_sender, event_receiver) = mpsc::unbounded();
// channels responsible for controlling ack messages
let (ack_sender, ack_receiver) = mpsc::unbounded();
let shared_topology_accessor =
@@ -887,6 +931,8 @@ where
None => nym_task::get_sdk_shutdown_tracker()?,
};
Self::start_event_control(self.event_tx, event_receiver, &shutdown_tracker);
// channels responsible for dealing with reply-related fun
let (reply_controller_sender, reply_controller_receiver) =
reply_controller::requests::new_control_channels();
@@ -977,6 +1023,7 @@ where
let (message_sender, client_request_sender) = Self::start_mix_traffic_controller(
gateway_transceiver,
&shutdown_tracker.child_tracker(),
EventSender(event_sender),
);
// Channels that the websocket listener can use to signal downstream to the real traffic
@@ -1026,8 +1073,8 @@ where
);
}
debug!("Core client startup finished!");
debug!("The address of this client is: {self_address}");
tracing::debug!("Core client startup finished!");
tracing::debug!("The address of this client is: {self_address}");
Ok(BaseClient {
address: self_address,
@@ -0,0 +1,40 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use futures::StreamExt;
use crate::client::base_client::{EventReceiver, EventSender, MixnetClientEvent};
/// Launches and manages task events, propagating upwards what is not strictly internal.
pub(crate) struct EventControl {
parent_event_tx: Option<EventSender>,
children_event_rx: EventReceiver,
}
impl EventControl {
pub(crate) fn new(
parent_event_tx: Option<EventSender>,
children_event_rx: EventReceiver,
) -> Self {
EventControl {
parent_event_tx,
children_event_rx,
}
}
fn is_internal(event: MixnetClientEvent) -> bool {
match event {
MixnetClientEvent::Traffic(_) => false,
}
}
pub(crate) async fn run(mut self) {
while let Some(event) = self.children_event_rx.next().await {
if let Some(parent_event_tx) = &self.parent_event_tx {
if !Self::is_internal(event) {
parent_event_tx.send(event);
}
}
}
}
}
@@ -1,7 +1,10 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::mix_traffic::transceiver::GatewayTransceiver;
use crate::client::{
base_client::{EventSender, MixnetClientEvent},
mix_traffic::transceiver::GatewayTransceiver,
};
use nym_gateway_requests::ClientRequest;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_task::ShutdownToken;
@@ -22,6 +25,11 @@ const MAX_FAILURE_COUNT: usize = 100;
// that's also disgusting.
pub struct Empty;
#[derive(Clone, Copy, Debug)]
pub enum MixTrafficEvent {
FailedSendingSphinx,
}
pub struct MixTrafficController {
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
@@ -33,12 +41,14 @@ pub struct MixTrafficController {
consecutive_gateway_failure_count: usize,
shutdown_token: ShutdownToken,
event_tx: EventSender,
}
impl MixTrafficController {
pub fn new<T>(
gateway_transceiver: T,
shutdown_token: ShutdownToken,
event_tx: EventSender,
) -> (
MixTrafficController,
BatchMixMessageSender,
@@ -59,6 +69,7 @@ impl MixTrafficController {
client_rx: client_receiver,
consecutive_gateway_failure_count: 0,
shutdown_token,
event_tx,
},
message_sender,
client_sender,
@@ -68,6 +79,7 @@ impl MixTrafficController {
pub fn new_dynamic(
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
shutdown_token: ShutdownToken,
event_tx: EventSender,
) -> (
MixTrafficController,
BatchMixMessageSender,
@@ -83,6 +95,7 @@ impl MixTrafficController {
client_rx: client_receiver,
consecutive_gateway_failure_count: 0,
shutdown_token,
event_tx,
},
message_sender,
client_sender,
@@ -155,6 +168,7 @@ impl MixTrafficController {
error!("Failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead");
// Do we need to handle the embedded mixnet client case
// separately?
self.event_tx.send(MixnetClientEvent::Traffic(MixTrafficEvent::FailedSendingSphinx));
break;
}
}
+1
View File
@@ -3,6 +3,7 @@
pub mod base_client;
pub mod cover_traffic_stream;
pub(crate) mod event_control;
pub(crate) mod helpers;
pub mod inbound_messages;
pub mod key_manager;
+1
View File
@@ -12,6 +12,7 @@ license.workspace = true
workspace = true
[dependencies]
futures.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-util.workspace = true
@@ -110,7 +110,6 @@ impl BuilderConfig {
.credentials_mode(true)
.with_remember_me(remember_me)
.custom_topology_provider(self.custom_topology_provider);
#[cfg(unix)]
let builder = builder.with_connection_fd_callback(self.connection_fd_callback);
+7 -3
View File
@@ -1,10 +1,11 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use futures::channel::mpsc;
use nym_bandwidth_controller::{BandwidthController, BandwidthTicketProvider};
use nym_credential_storage::ephemeral_storage::EphemeralCredentialStorage;
use nym_sdk::{
mixnet::{MixnetClient, MixnetClientBuilder},
mixnet::{EventSender, MixnetClient, MixnetClientBuilder},
NymNetworkDetails,
};
use nym_validator_client::{
@@ -38,6 +39,7 @@ impl RegistrationClientBuilder {
data_path: self.config.data_path.clone(),
};
let cancel_token = self.config.cancel_token.clone();
let (event_tx, event_rx) = mpsc::unbounded();
let nyxd_client = get_nyxd_client(&self.config.network_env)?;
@@ -45,7 +47,8 @@ impl RegistrationClientBuilder {
MixnetClient,
Box<dyn BandwidthTicketProvider>,
) = if let Some((mixnet_client_storage, credential_storage)) = storage {
let builder = MixnetClientBuilder::new_with_storage(mixnet_client_storage);
let builder = MixnetClientBuilder::new_with_storage(mixnet_client_storage)
.event_tx(EventSender(event_tx));
let mixnet_client = tokio::time::timeout(
MIXNET_CLIENT_STARTUP_TIMEOUT,
self.config.build_and_connect_mixnet_client(builder),
@@ -55,7 +58,7 @@ impl RegistrationClientBuilder {
Box::new(BandwidthController::new(credential_storage, nyxd_client));
(mixnet_client, bandwidth_controller)
} else {
let builder = MixnetClientBuilder::new_ephemeral();
let builder = MixnetClientBuilder::new_ephemeral().event_tx(EventSender(event_tx));
let mixnet_client = tokio::time::timeout(
MIXNET_CLIENT_STARTUP_TIMEOUT,
self.config.build_and_connect_mixnet_client(builder),
@@ -75,6 +78,7 @@ impl RegistrationClientBuilder {
cancel_token,
mixnet_client_address,
bandwidth_controller,
event_rx,
})
}
}
+3 -1
View File
@@ -8,7 +8,7 @@ use nym_bandwidth_controller::BandwidthTicketProvider;
use nym_credentials_interface::TicketType;
use nym_ip_packet_client::IprClientConnect;
use nym_registration_common::AssignedAddresses;
use nym_sdk::mixnet::{MixnetClient, Recipient};
use nym_sdk::mixnet::{EventReceiver, MixnetClient, Recipient};
use crate::config::RegistrationClientConfig;
@@ -28,6 +28,7 @@ pub struct RegistrationClient {
mixnet_client_address: Recipient,
bandwidth_controller: Box<dyn BandwidthTicketProvider>,
cancel_token: CancellationToken,
event_rx: EventReceiver,
}
impl RegistrationClient {
@@ -58,6 +59,7 @@ impl RegistrationClient {
entry_mixnet_gateway_ip,
exit_mixnet_gateway_ip,
},
event_rx: self.event_rx,
},
)))
}
+2 -1
View File
@@ -4,7 +4,7 @@
use nym_authenticator_client::{AuthClientMixnetListenerHandle, AuthenticatorClient};
use nym_bandwidth_controller::BandwidthTicketProvider;
use nym_registration_common::{AssignedAddresses, GatewayData};
use nym_sdk::mixnet::MixnetClient;
use nym_sdk::mixnet::{EventReceiver, MixnetClient};
pub enum RegistrationResult {
Mixnet(Box<MixnetRegistrationResult>),
@@ -14,6 +14,7 @@ pub enum RegistrationResult {
pub struct MixnetRegistrationResult {
pub assigned_addresses: AssignedAddresses,
pub mixnet_client: MixnetClient,
pub event_rx: EventReceiver,
}
pub struct WireguardRegistrationResult {
+9 -3
View File
@@ -45,15 +45,21 @@ pub use native_client::MixnetClient;
pub use native_client::MixnetClientSender;
#[allow(deprecated)]
pub use nym_client_core::client::{
base_client::storage::{
gateways_storage::{ActiveGateway, BadGateway, GatewayRegistration, GatewaysDetailsStore},
Ephemeral, MixnetClientStorage, OnDiskPersistent,
base_client::{
storage::{
gateways_storage::{
ActiveGateway, BadGateway, GatewayRegistration, GatewaysDetailsStore,
},
Ephemeral, MixnetClientStorage, OnDiskPersistent,
},
EventReceiver, EventSender, MixnetClientEvent,
},
inbound_messages::InputMessage,
key_manager::{
persistence::{InMemEphemeralKeys, KeyStore, OnDiskKeys},
ClientKeys,
},
mix_traffic::MixTrafficEvent,
replies::reply_storage::{
fs_backend::Backend as ReplyStorage, CombinedReplyStorage, Empty as EmptyReplyStorage,
ReplyStorageBackend,
+26 -3
View File
@@ -16,7 +16,7 @@ use nym_client_core::client::base_client::storage::helpers::{
use nym_client_core::client::base_client::storage::{
Ephemeral, GatewaysDetailsStore, MixnetClientStorage, OnDiskPersistent,
};
use nym_client_core::client::base_client::BaseClient;
use nym_client_core::client::base_client::{BaseClient, EventSender};
use nym_client_core::client::key_manager::persistence::KeyStore;
use nym_client_core::client::{
base_client::BaseClientBuilder, replies::reply_storage::ReplyStorageBackend,
@@ -53,6 +53,7 @@ pub struct MixnetClientBuilder<S: MixnetClientStorage = Ephemeral> {
custom_topology_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
custom_gateway_transceiver: Option<Box<dyn GatewayTransceiver + Send + Sync>>,
custom_shutdown: Option<ShutdownTracker>,
event_tx: Option<EventSender>,
force_tls: bool,
user_agent: Option<UserAgent>,
#[cfg(unix)]
@@ -96,6 +97,7 @@ impl MixnetClientBuilder<OnDiskPersistent> {
.await?,
gateway_endpoint_config_path: None,
custom_shutdown: None,
event_tx: None,
custom_gateway_transceiver: None,
force_tls: false,
user_agent: None,
@@ -129,6 +131,7 @@ where
custom_topology_provider: None,
custom_gateway_transceiver: None,
custom_shutdown: None,
event_tx: None,
force_tls: false,
user_agent: None,
#[cfg(unix)]
@@ -152,6 +155,7 @@ where
custom_topology_provider: self.custom_topology_provider,
custom_gateway_transceiver: self.custom_gateway_transceiver,
custom_shutdown: self.custom_shutdown,
event_tx: self.event_tx,
force_tls: self.force_tls,
user_agent: self.user_agent,
#[cfg(unix)]
@@ -269,6 +273,13 @@ where
self
}
/// Use an externally managed shutdown mechanism.
#[must_use]
pub fn event_tx(mut self, event_tx: EventSender) -> Self {
self.event_tx = Some(event_tx);
self
}
/// Attempt to wait for the selected gateway (if applicable) to come online if its currently not bonded.
#[must_use]
pub fn with_wait_for_gateway(mut self, wait_for_gateway: bool) -> Self {
@@ -317,8 +328,12 @@ where
/// Construct a [`DisconnectedMixnetClient`] from the setup specified.
pub fn build(self) -> Result<DisconnectedMixnetClient<S>> {
let mut client =
DisconnectedMixnetClient::new(self.config, self.socks5_config, self.storage)?;
let mut client = DisconnectedMixnetClient::new(
self.config,
self.socks5_config,
self.storage,
self.event_tx,
)?;
client.custom_gateway_transceiver = self.custom_gateway_transceiver;
client.custom_topology_provider = self.custom_topology_provider;
@@ -380,6 +395,9 @@ where
/// Allows passing an externally controlled shutdown handle.
custom_shutdown: Option<ShutdownTracker>,
/// Sender of mixnet client events to the SDK caller
event_tx: Option<EventSender>,
user_agent: Option<UserAgent>,
/// Callback on the websocket fd as soon as the connection has been established
@@ -415,6 +433,7 @@ where
config: Config,
socks5_config: Option<Socks5>,
storage: S,
event_tx: Option<EventSender>,
) -> Result<DisconnectedMixnetClient<S>> {
// don't create dkg client for the bandwidth controller if credentials are disabled
let dkg_query_client = if config.enabled_credentials_mode {
@@ -443,6 +462,7 @@ where
wait_for_gateway: false,
force_tls: false,
custom_shutdown: None,
event_tx,
user_agent: None,
#[cfg(unix)]
connection_fd_callback: None,
@@ -699,6 +719,9 @@ where
}
};
base_builder = base_builder.with_shutdown(shutdown_tracker);
if let Some(event_tx) = self.event_tx {
base_builder = base_builder.with_event_tx(event_tx);
}
if let Some(gateway_transceiver) = self.custom_gateway_transceiver {
base_builder = base_builder.with_gateway_transceiver(gateway_transceiver);