Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 407725f697 |
Generated
+1
@@ -6672,6 +6672,7 @@ dependencies = [
|
||||
name = "nym-registration-client"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"nym-authenticator-client",
|
||||
"nym-bandwidth-controller",
|
||||
"nym-credential-storage",
|
||||
|
||||
@@ -7,11 +7,12 @@ use super::statistics_control::StatisticsControl;
|
||||
use crate::client::base_client::storage::helpers::store_client_keys;
|
||||
use crate::client::base_client::storage::MixnetClientStorage;
|
||||
use crate::client::cover_traffic_stream::LoopCoverTrafficStream;
|
||||
use crate::client::event_control::EventControl;
|
||||
use crate::client::inbound_messages::{InputMessage, InputMessageReceiver, InputMessageSender};
|
||||
use crate::client::key_manager::persistence::KeyStore;
|
||||
use crate::client::key_manager::ClientKeys;
|
||||
use crate::client::mix_traffic::transceiver::{GatewayReceiver, GatewayTransceiver, RemoteGateway};
|
||||
use crate::client::mix_traffic::{BatchMixMessageSender, MixTrafficController};
|
||||
use crate::client::mix_traffic::{BatchMixMessageSender, MixTrafficController, MixTrafficEvent};
|
||||
use crate::client::real_messages_control;
|
||||
use crate::client::real_messages_control::RealMessagesController;
|
||||
use crate::client::received_buffer::{
|
||||
@@ -66,7 +67,6 @@ use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use time::OffsetDateTime;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tracing::*;
|
||||
use url::Url;
|
||||
|
||||
#[cfg(all(
|
||||
@@ -79,6 +79,23 @@ pub mod non_wasm_helpers;
|
||||
pub mod helpers;
|
||||
pub mod storage;
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub enum MixnetClientEvent {
|
||||
Traffic(MixTrafficEvent),
|
||||
}
|
||||
|
||||
pub type EventReceiver = mpsc::UnboundedReceiver<MixnetClientEvent>;
|
||||
#[derive(Clone)]
|
||||
pub struct EventSender(pub mpsc::UnboundedSender<MixnetClientEvent>);
|
||||
|
||||
impl EventSender {
|
||||
pub fn send(&self, event: MixnetClientEvent) {
|
||||
if let Err(err) = self.0.unbounded_send(event) {
|
||||
tracing::warn!("Failed to send error event. The caller event reader was closed: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ClientInput {
|
||||
pub connection_command_sender: ConnectionCommandSender,
|
||||
@@ -194,6 +211,7 @@ pub struct BaseClientBuilder<C, S: MixnetClientStorage> {
|
||||
custom_topology_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
|
||||
custom_gateway_transceiver: Option<Box<dyn GatewayTransceiver + Send>>,
|
||||
shutdown: Option<ShutdownTracker>,
|
||||
event_tx: Option<EventSender>,
|
||||
user_agent: Option<UserAgent>,
|
||||
|
||||
setup_method: GatewaySetup,
|
||||
@@ -222,6 +240,7 @@ where
|
||||
custom_topology_provider: None,
|
||||
custom_gateway_transceiver: None,
|
||||
shutdown: None,
|
||||
event_tx: None,
|
||||
user_agent: None,
|
||||
setup_method: GatewaySetup::MustLoad { gateway_id: None },
|
||||
#[cfg(unix)]
|
||||
@@ -284,6 +303,12 @@ where
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_event_tx(mut self, event_tx: EventSender) -> Self {
|
||||
self.event_tx = Some(event_tx);
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_user_agent(mut self, user_agent: UserAgent) -> Self {
|
||||
self.user_agent = Some(user_agent);
|
||||
@@ -314,6 +339,18 @@ where
|
||||
details.client_address()
|
||||
}
|
||||
|
||||
fn start_event_control(
|
||||
parent_event_tx: Option<EventSender>,
|
||||
children_event_rx: EventReceiver,
|
||||
shutdown_tracker: &ShutdownTracker,
|
||||
) {
|
||||
let event_control = EventControl::new(parent_event_tx, children_event_rx);
|
||||
shutdown_tracker.try_spawn_named_with_shutdown(
|
||||
async move { event_control.run().await },
|
||||
"EventControl",
|
||||
);
|
||||
}
|
||||
|
||||
// future constantly pumping loop cover traffic at some specified average rate
|
||||
// the pumped traffic goes to the MixTrafficController
|
||||
fn start_cover_traffic_stream(
|
||||
@@ -325,7 +362,7 @@ where
|
||||
stats_tx: ClientStatsSender,
|
||||
shutdown_tracker: &ShutdownTracker,
|
||||
) {
|
||||
info!("Starting loop cover traffic stream...");
|
||||
tracing::info!("Starting loop cover traffic stream...");
|
||||
|
||||
let mut stream = LoopCoverTrafficStream::new(
|
||||
ack_key,
|
||||
@@ -357,7 +394,7 @@ where
|
||||
stats_tx: ClientStatsSender,
|
||||
shutdown_tracker: &ShutdownTracker,
|
||||
) {
|
||||
info!("Starting real traffic stream...");
|
||||
tracing::info!("Starting real traffic stream...");
|
||||
|
||||
let real_messages_controller = RealMessagesController::new(
|
||||
controller_config,
|
||||
@@ -442,7 +479,7 @@ where
|
||||
metrics_reporter: ClientStatsSender,
|
||||
shutdown_tracker: &ShutdownTracker,
|
||||
) {
|
||||
info!("Starting received messages buffer controller...");
|
||||
tracing::info!("Starting received messages buffer controller...");
|
||||
let controller = ReceivedMessagesBufferController::<SphinxMessageReceiver>::new(
|
||||
local_encryption_keypair,
|
||||
query_receiver,
|
||||
@@ -553,7 +590,7 @@ where
|
||||
details_store
|
||||
.upgrade_stored_remote_gateway_key(gateway_client.gateway_identity(), &updated_key)
|
||||
.await.map_err(|err| {
|
||||
error!("failed to store upgraded gateway key! this connection might be forever broken now: {err}");
|
||||
tracing::error!("failed to store upgraded gateway key! this connection might be forever broken now: {err}");
|
||||
ClientCoreError::GatewaysDetailsStoreError { source: Box::new(err) }
|
||||
})?
|
||||
}
|
||||
@@ -650,7 +687,7 @@ where
|
||||
|
||||
if topology_config.disable_refreshing {
|
||||
// if we're not spawning the refresher, don't cause shutdown immediately
|
||||
info!("The background topology refresher is not going to be started");
|
||||
tracing::info!("The background topology refresher is not going to be started");
|
||||
}
|
||||
|
||||
let mut topology_refresher = TopologyRefresher::new(
|
||||
@@ -660,7 +697,7 @@ where
|
||||
);
|
||||
// before returning, block entire runtime to refresh the current network view so that any
|
||||
// components depending on topology would see a non-empty view
|
||||
info!("Obtaining initial network topology");
|
||||
tracing::info!("Obtaining initial network topology");
|
||||
topology_refresher.try_refresh().await;
|
||||
|
||||
if let Err(err) = topology_refresher.ensure_topology_is_routable().await {
|
||||
@@ -686,13 +723,13 @@ where
|
||||
.wait_for_gateway(local_gateway, waiting_timeout)
|
||||
.await
|
||||
{
|
||||
error!(
|
||||
tracing::error!(
|
||||
"the gateway did not come back online within the specified timeout: {err}"
|
||||
);
|
||||
return Err(err.into());
|
||||
}
|
||||
} else {
|
||||
error!("the gateway we're supposedly connected to does not exist. We'll not be able to send any packets to ourselves: {err}");
|
||||
tracing::error!("the gateway we're supposedly connected to does not exist. We'll not be able to send any packets to ourselves: {err}");
|
||||
return Err(err.into());
|
||||
}
|
||||
}
|
||||
@@ -700,7 +737,7 @@ where
|
||||
if !topology_config.disable_refreshing {
|
||||
// don't spawn the refresher if we don't want to be refreshing the topology.
|
||||
// only use the initial values obtained
|
||||
info!("Starting topology refresher...");
|
||||
tracing::info!("Starting topology refresher...");
|
||||
shutdown_tracker.try_spawn_named_with_shutdown(
|
||||
async move { topology_refresher.run().await },
|
||||
"TopologyRefresher",
|
||||
@@ -717,7 +754,7 @@ where
|
||||
input_sender: Sender<InputMessage>,
|
||||
shutdown_tracker: &ShutdownTracker,
|
||||
) -> ClientStatsSender {
|
||||
info!("Starting statistics control...");
|
||||
tracing::info!("Starting statistics control...");
|
||||
StatisticsControl::create_and_start(
|
||||
config.debug.stats_reporting,
|
||||
user_agent
|
||||
@@ -732,10 +769,14 @@ where
|
||||
fn start_mix_traffic_controller(
|
||||
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
|
||||
shutdown_tracker: &ShutdownTracker,
|
||||
event_tx: EventSender,
|
||||
) -> (BatchMixMessageSender, ClientRequestSender) {
|
||||
info!("Starting mix traffic controller...");
|
||||
let (mut mix_traffic_controller, mix_tx, client_tx) =
|
||||
MixTrafficController::new(gateway_transceiver, shutdown_tracker.clone_shutdown_token());
|
||||
tracing::info!("Starting mix traffic controller...");
|
||||
let (mut mix_traffic_controller, mix_tx, client_tx) = MixTrafficController::new(
|
||||
gateway_transceiver,
|
||||
shutdown_tracker.clone_shutdown_token(),
|
||||
event_tx,
|
||||
);
|
||||
|
||||
shutdown_tracker.try_spawn_named(
|
||||
async move { mix_traffic_controller.run().await },
|
||||
@@ -799,7 +840,7 @@ where
|
||||
{
|
||||
// if client keys do not exist already, create and persist them
|
||||
if key_store.load_keys().await.is_err() {
|
||||
info!("could not find valid client keys - a new set will be generated");
|
||||
tracing::info!("could not find valid client keys - a new set will be generated");
|
||||
let mut rng = OsRng;
|
||||
let keys = if let Some(derivation_material) = derivation_material {
|
||||
ClientKeys::from_master_key(&mut rng, &derivation_material)
|
||||
@@ -846,7 +887,7 @@ where
|
||||
<S::CredentialStore as CredentialStorage>::StorageError: Send + Sync + 'static,
|
||||
<S::GatewaysDetailsStore as GatewaysDetailsStore>::StorageError: Sync + Send,
|
||||
{
|
||||
info!("Starting nym client");
|
||||
tracing::info!("Starting nym client");
|
||||
|
||||
// derive (or load) client keys and gateway configuration
|
||||
let init_res = Self::initialise_keys_and_gateway(
|
||||
@@ -875,6 +916,9 @@ where
|
||||
// channels responsible for controlling real messages
|
||||
let (input_sender, input_receiver) = tokio::sync::mpsc::channel::<InputMessage>(1);
|
||||
|
||||
// channels responsible for event management
|
||||
let (event_sender, event_receiver) = mpsc::unbounded();
|
||||
|
||||
// channels responsible for controlling ack messages
|
||||
let (ack_sender, ack_receiver) = mpsc::unbounded();
|
||||
let shared_topology_accessor =
|
||||
@@ -887,6 +931,8 @@ where
|
||||
None => nym_task::get_sdk_shutdown_tracker()?,
|
||||
};
|
||||
|
||||
Self::start_event_control(self.event_tx, event_receiver, &shutdown_tracker);
|
||||
|
||||
// channels responsible for dealing with reply-related fun
|
||||
let (reply_controller_sender, reply_controller_receiver) =
|
||||
reply_controller::requests::new_control_channels();
|
||||
@@ -977,6 +1023,7 @@ where
|
||||
let (message_sender, client_request_sender) = Self::start_mix_traffic_controller(
|
||||
gateway_transceiver,
|
||||
&shutdown_tracker.child_tracker(),
|
||||
EventSender(event_sender),
|
||||
);
|
||||
|
||||
// Channels that the websocket listener can use to signal downstream to the real traffic
|
||||
@@ -1026,8 +1073,8 @@ where
|
||||
);
|
||||
}
|
||||
|
||||
debug!("Core client startup finished!");
|
||||
debug!("The address of this client is: {self_address}");
|
||||
tracing::debug!("Core client startup finished!");
|
||||
tracing::debug!("The address of this client is: {self_address}");
|
||||
|
||||
Ok(BaseClient {
|
||||
address: self_address,
|
||||
|
||||
@@ -0,0 +1,40 @@
|
||||
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use futures::StreamExt;
|
||||
|
||||
use crate::client::base_client::{EventReceiver, EventSender, MixnetClientEvent};
|
||||
|
||||
/// Launches and manages task events, propagating upwards what is not strictly internal.
|
||||
pub(crate) struct EventControl {
|
||||
parent_event_tx: Option<EventSender>,
|
||||
children_event_rx: EventReceiver,
|
||||
}
|
||||
|
||||
impl EventControl {
|
||||
pub(crate) fn new(
|
||||
parent_event_tx: Option<EventSender>,
|
||||
children_event_rx: EventReceiver,
|
||||
) -> Self {
|
||||
EventControl {
|
||||
parent_event_tx,
|
||||
children_event_rx,
|
||||
}
|
||||
}
|
||||
|
||||
fn is_internal(event: MixnetClientEvent) -> bool {
|
||||
match event {
|
||||
MixnetClientEvent::Traffic(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn run(mut self) {
|
||||
while let Some(event) = self.children_event_rx.next().await {
|
||||
if let Some(parent_event_tx) = &self.parent_event_tx {
|
||||
if !Self::is_internal(event) {
|
||||
parent_event_tx.send(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,10 @@
|
||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::client::mix_traffic::transceiver::GatewayTransceiver;
|
||||
use crate::client::{
|
||||
base_client::{EventSender, MixnetClientEvent},
|
||||
mix_traffic::transceiver::GatewayTransceiver,
|
||||
};
|
||||
use nym_gateway_requests::ClientRequest;
|
||||
use nym_sphinx::forwarding::packet::MixPacket;
|
||||
use nym_task::ShutdownToken;
|
||||
@@ -22,6 +25,11 @@ const MAX_FAILURE_COUNT: usize = 100;
|
||||
// that's also disgusting.
|
||||
pub struct Empty;
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub enum MixTrafficEvent {
|
||||
FailedSendingSphinx,
|
||||
}
|
||||
|
||||
pub struct MixTrafficController {
|
||||
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
|
||||
|
||||
@@ -33,12 +41,14 @@ pub struct MixTrafficController {
|
||||
consecutive_gateway_failure_count: usize,
|
||||
|
||||
shutdown_token: ShutdownToken,
|
||||
event_tx: EventSender,
|
||||
}
|
||||
|
||||
impl MixTrafficController {
|
||||
pub fn new<T>(
|
||||
gateway_transceiver: T,
|
||||
shutdown_token: ShutdownToken,
|
||||
event_tx: EventSender,
|
||||
) -> (
|
||||
MixTrafficController,
|
||||
BatchMixMessageSender,
|
||||
@@ -59,6 +69,7 @@ impl MixTrafficController {
|
||||
client_rx: client_receiver,
|
||||
consecutive_gateway_failure_count: 0,
|
||||
shutdown_token,
|
||||
event_tx,
|
||||
},
|
||||
message_sender,
|
||||
client_sender,
|
||||
@@ -68,6 +79,7 @@ impl MixTrafficController {
|
||||
pub fn new_dynamic(
|
||||
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
|
||||
shutdown_token: ShutdownToken,
|
||||
event_tx: EventSender,
|
||||
) -> (
|
||||
MixTrafficController,
|
||||
BatchMixMessageSender,
|
||||
@@ -83,6 +95,7 @@ impl MixTrafficController {
|
||||
client_rx: client_receiver,
|
||||
consecutive_gateway_failure_count: 0,
|
||||
shutdown_token,
|
||||
event_tx,
|
||||
},
|
||||
message_sender,
|
||||
client_sender,
|
||||
@@ -155,6 +168,7 @@ impl MixTrafficController {
|
||||
error!("Failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead");
|
||||
// Do we need to handle the embedded mixnet client case
|
||||
// separately?
|
||||
self.event_tx.send(MixnetClientEvent::Traffic(MixTrafficEvent::FailedSendingSphinx));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
pub mod base_client;
|
||||
pub mod cover_traffic_stream;
|
||||
pub(crate) mod event_control;
|
||||
pub(crate) mod helpers;
|
||||
pub mod inbound_messages;
|
||||
pub mod key_manager;
|
||||
|
||||
@@ -12,6 +12,7 @@ license.workspace = true
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
futures.workspace = true
|
||||
thiserror.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-util.workspace = true
|
||||
|
||||
@@ -110,7 +110,6 @@ impl BuilderConfig {
|
||||
.credentials_mode(true)
|
||||
.with_remember_me(remember_me)
|
||||
.custom_topology_provider(self.custom_topology_provider);
|
||||
|
||||
#[cfg(unix)]
|
||||
let builder = builder.with_connection_fd_callback(self.connection_fd_callback);
|
||||
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use futures::channel::mpsc;
|
||||
use nym_bandwidth_controller::{BandwidthController, BandwidthTicketProvider};
|
||||
use nym_credential_storage::ephemeral_storage::EphemeralCredentialStorage;
|
||||
use nym_sdk::{
|
||||
mixnet::{MixnetClient, MixnetClientBuilder},
|
||||
mixnet::{EventSender, MixnetClient, MixnetClientBuilder},
|
||||
NymNetworkDetails,
|
||||
};
|
||||
use nym_validator_client::{
|
||||
@@ -38,6 +39,7 @@ impl RegistrationClientBuilder {
|
||||
data_path: self.config.data_path.clone(),
|
||||
};
|
||||
let cancel_token = self.config.cancel_token.clone();
|
||||
let (event_tx, event_rx) = mpsc::unbounded();
|
||||
|
||||
let nyxd_client = get_nyxd_client(&self.config.network_env)?;
|
||||
|
||||
@@ -45,7 +47,8 @@ impl RegistrationClientBuilder {
|
||||
MixnetClient,
|
||||
Box<dyn BandwidthTicketProvider>,
|
||||
) = if let Some((mixnet_client_storage, credential_storage)) = storage {
|
||||
let builder = MixnetClientBuilder::new_with_storage(mixnet_client_storage);
|
||||
let builder = MixnetClientBuilder::new_with_storage(mixnet_client_storage)
|
||||
.event_tx(EventSender(event_tx));
|
||||
let mixnet_client = tokio::time::timeout(
|
||||
MIXNET_CLIENT_STARTUP_TIMEOUT,
|
||||
self.config.build_and_connect_mixnet_client(builder),
|
||||
@@ -55,7 +58,7 @@ impl RegistrationClientBuilder {
|
||||
Box::new(BandwidthController::new(credential_storage, nyxd_client));
|
||||
(mixnet_client, bandwidth_controller)
|
||||
} else {
|
||||
let builder = MixnetClientBuilder::new_ephemeral();
|
||||
let builder = MixnetClientBuilder::new_ephemeral().event_tx(EventSender(event_tx));
|
||||
let mixnet_client = tokio::time::timeout(
|
||||
MIXNET_CLIENT_STARTUP_TIMEOUT,
|
||||
self.config.build_and_connect_mixnet_client(builder),
|
||||
@@ -75,6 +78,7 @@ impl RegistrationClientBuilder {
|
||||
cancel_token,
|
||||
mixnet_client_address,
|
||||
bandwidth_controller,
|
||||
event_rx,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@ use nym_bandwidth_controller::BandwidthTicketProvider;
|
||||
use nym_credentials_interface::TicketType;
|
||||
use nym_ip_packet_client::IprClientConnect;
|
||||
use nym_registration_common::AssignedAddresses;
|
||||
use nym_sdk::mixnet::{MixnetClient, Recipient};
|
||||
use nym_sdk::mixnet::{EventReceiver, MixnetClient, Recipient};
|
||||
|
||||
use crate::config::RegistrationClientConfig;
|
||||
|
||||
@@ -28,6 +28,7 @@ pub struct RegistrationClient {
|
||||
mixnet_client_address: Recipient,
|
||||
bandwidth_controller: Box<dyn BandwidthTicketProvider>,
|
||||
cancel_token: CancellationToken,
|
||||
event_rx: EventReceiver,
|
||||
}
|
||||
|
||||
impl RegistrationClient {
|
||||
@@ -58,6 +59,7 @@ impl RegistrationClient {
|
||||
entry_mixnet_gateway_ip,
|
||||
exit_mixnet_gateway_ip,
|
||||
},
|
||||
event_rx: self.event_rx,
|
||||
},
|
||||
)))
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
use nym_authenticator_client::{AuthClientMixnetListenerHandle, AuthenticatorClient};
|
||||
use nym_bandwidth_controller::BandwidthTicketProvider;
|
||||
use nym_registration_common::{AssignedAddresses, GatewayData};
|
||||
use nym_sdk::mixnet::MixnetClient;
|
||||
use nym_sdk::mixnet::{EventReceiver, MixnetClient};
|
||||
|
||||
pub enum RegistrationResult {
|
||||
Mixnet(Box<MixnetRegistrationResult>),
|
||||
@@ -14,6 +14,7 @@ pub enum RegistrationResult {
|
||||
pub struct MixnetRegistrationResult {
|
||||
pub assigned_addresses: AssignedAddresses,
|
||||
pub mixnet_client: MixnetClient,
|
||||
pub event_rx: EventReceiver,
|
||||
}
|
||||
|
||||
pub struct WireguardRegistrationResult {
|
||||
|
||||
@@ -45,15 +45,21 @@ pub use native_client::MixnetClient;
|
||||
pub use native_client::MixnetClientSender;
|
||||
#[allow(deprecated)]
|
||||
pub use nym_client_core::client::{
|
||||
base_client::storage::{
|
||||
gateways_storage::{ActiveGateway, BadGateway, GatewayRegistration, GatewaysDetailsStore},
|
||||
Ephemeral, MixnetClientStorage, OnDiskPersistent,
|
||||
base_client::{
|
||||
storage::{
|
||||
gateways_storage::{
|
||||
ActiveGateway, BadGateway, GatewayRegistration, GatewaysDetailsStore,
|
||||
},
|
||||
Ephemeral, MixnetClientStorage, OnDiskPersistent,
|
||||
},
|
||||
EventReceiver, EventSender, MixnetClientEvent,
|
||||
},
|
||||
inbound_messages::InputMessage,
|
||||
key_manager::{
|
||||
persistence::{InMemEphemeralKeys, KeyStore, OnDiskKeys},
|
||||
ClientKeys,
|
||||
},
|
||||
mix_traffic::MixTrafficEvent,
|
||||
replies::reply_storage::{
|
||||
fs_backend::Backend as ReplyStorage, CombinedReplyStorage, Empty as EmptyReplyStorage,
|
||||
ReplyStorageBackend,
|
||||
|
||||
@@ -16,7 +16,7 @@ use nym_client_core::client::base_client::storage::helpers::{
|
||||
use nym_client_core::client::base_client::storage::{
|
||||
Ephemeral, GatewaysDetailsStore, MixnetClientStorage, OnDiskPersistent,
|
||||
};
|
||||
use nym_client_core::client::base_client::BaseClient;
|
||||
use nym_client_core::client::base_client::{BaseClient, EventSender};
|
||||
use nym_client_core::client::key_manager::persistence::KeyStore;
|
||||
use nym_client_core::client::{
|
||||
base_client::BaseClientBuilder, replies::reply_storage::ReplyStorageBackend,
|
||||
@@ -53,6 +53,7 @@ pub struct MixnetClientBuilder<S: MixnetClientStorage = Ephemeral> {
|
||||
custom_topology_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
|
||||
custom_gateway_transceiver: Option<Box<dyn GatewayTransceiver + Send + Sync>>,
|
||||
custom_shutdown: Option<ShutdownTracker>,
|
||||
event_tx: Option<EventSender>,
|
||||
force_tls: bool,
|
||||
user_agent: Option<UserAgent>,
|
||||
#[cfg(unix)]
|
||||
@@ -96,6 +97,7 @@ impl MixnetClientBuilder<OnDiskPersistent> {
|
||||
.await?,
|
||||
gateway_endpoint_config_path: None,
|
||||
custom_shutdown: None,
|
||||
event_tx: None,
|
||||
custom_gateway_transceiver: None,
|
||||
force_tls: false,
|
||||
user_agent: None,
|
||||
@@ -129,6 +131,7 @@ where
|
||||
custom_topology_provider: None,
|
||||
custom_gateway_transceiver: None,
|
||||
custom_shutdown: None,
|
||||
event_tx: None,
|
||||
force_tls: false,
|
||||
user_agent: None,
|
||||
#[cfg(unix)]
|
||||
@@ -152,6 +155,7 @@ where
|
||||
custom_topology_provider: self.custom_topology_provider,
|
||||
custom_gateway_transceiver: self.custom_gateway_transceiver,
|
||||
custom_shutdown: self.custom_shutdown,
|
||||
event_tx: self.event_tx,
|
||||
force_tls: self.force_tls,
|
||||
user_agent: self.user_agent,
|
||||
#[cfg(unix)]
|
||||
@@ -269,6 +273,13 @@ where
|
||||
self
|
||||
}
|
||||
|
||||
/// Use an externally managed shutdown mechanism.
|
||||
#[must_use]
|
||||
pub fn event_tx(mut self, event_tx: EventSender) -> Self {
|
||||
self.event_tx = Some(event_tx);
|
||||
self
|
||||
}
|
||||
|
||||
/// Attempt to wait for the selected gateway (if applicable) to come online if its currently not bonded.
|
||||
#[must_use]
|
||||
pub fn with_wait_for_gateway(mut self, wait_for_gateway: bool) -> Self {
|
||||
@@ -317,8 +328,12 @@ where
|
||||
|
||||
/// Construct a [`DisconnectedMixnetClient`] from the setup specified.
|
||||
pub fn build(self) -> Result<DisconnectedMixnetClient<S>> {
|
||||
let mut client =
|
||||
DisconnectedMixnetClient::new(self.config, self.socks5_config, self.storage)?;
|
||||
let mut client = DisconnectedMixnetClient::new(
|
||||
self.config,
|
||||
self.socks5_config,
|
||||
self.storage,
|
||||
self.event_tx,
|
||||
)?;
|
||||
|
||||
client.custom_gateway_transceiver = self.custom_gateway_transceiver;
|
||||
client.custom_topology_provider = self.custom_topology_provider;
|
||||
@@ -380,6 +395,9 @@ where
|
||||
/// Allows passing an externally controlled shutdown handle.
|
||||
custom_shutdown: Option<ShutdownTracker>,
|
||||
|
||||
/// Sender of mixnet client events to the SDK caller
|
||||
event_tx: Option<EventSender>,
|
||||
|
||||
user_agent: Option<UserAgent>,
|
||||
|
||||
/// Callback on the websocket fd as soon as the connection has been established
|
||||
@@ -415,6 +433,7 @@ where
|
||||
config: Config,
|
||||
socks5_config: Option<Socks5>,
|
||||
storage: S,
|
||||
event_tx: Option<EventSender>,
|
||||
) -> Result<DisconnectedMixnetClient<S>> {
|
||||
// don't create dkg client for the bandwidth controller if credentials are disabled
|
||||
let dkg_query_client = if config.enabled_credentials_mode {
|
||||
@@ -443,6 +462,7 @@ where
|
||||
wait_for_gateway: false,
|
||||
force_tls: false,
|
||||
custom_shutdown: None,
|
||||
event_tx,
|
||||
user_agent: None,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: None,
|
||||
@@ -699,6 +719,9 @@ where
|
||||
}
|
||||
};
|
||||
base_builder = base_builder.with_shutdown(shutdown_tracker);
|
||||
if let Some(event_tx) = self.event_tx {
|
||||
base_builder = base_builder.with_event_tx(event_tx);
|
||||
}
|
||||
|
||||
if let Some(gateway_transceiver) = self.custom_gateway_transceiver {
|
||||
base_builder = base_builder.with_gateway_transceiver(gateway_transceiver);
|
||||
|
||||
Reference in New Issue
Block a user