Compare commits
2 Commits
gear
...
fig-release
| Author | SHA1 | Date | |
|---|---|---|---|
| 1ecc2a8dda | |||
| f9659ef42c |
@@ -50,7 +50,7 @@ impl AuthClientMixnetListener {
|
||||
}
|
||||
}
|
||||
|
||||
async fn run(mut self) -> Self {
|
||||
async fn run(mut self) {
|
||||
let mixnet_cancel_token = self.mixnet_client.cancellation_token();
|
||||
self.shutdown_token.run_until_cancelled(async {
|
||||
loop {
|
||||
@@ -95,12 +95,8 @@ impl AuthClientMixnetListener {
|
||||
tracing::debug!("AuthClientMixnetListener is shutting down");
|
||||
}).await;
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
// Disconnects the mixnet client and effectively drop itself, since it doesn't work without one, and reconnecting isn't supported
|
||||
pub async fn disconnect_mixnet_client(self) {
|
||||
if !self.mixnet_client.cancellation_token().is_cancelled() {
|
||||
tracing::debug!("AuthClientMixnetListener: Disconnect mixnet client");
|
||||
if !mixnet_cancel_token.is_cancelled() {
|
||||
self.mixnet_client.disconnect().await;
|
||||
}
|
||||
}
|
||||
@@ -128,7 +124,7 @@ pub struct AuthClientMixnetListenerHandle {
|
||||
message_sender: MixnetMessageInputSender,
|
||||
cancellation_token: CancellationToken,
|
||||
mixnet_cancellation_token: CancellationToken,
|
||||
handle: JoinHandle<AuthClientMixnetListener>,
|
||||
handle: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl AuthClientMixnetListenerHandle {
|
||||
@@ -148,13 +144,8 @@ impl AuthClientMixnetListenerHandle {
|
||||
// If shutdown was externally called, that call is a no-op
|
||||
// If we're only stopping this, it is very much needed
|
||||
self.cancellation_token.cancel();
|
||||
match self.handle.await {
|
||||
Ok(auth_client_mixnet_listener) => {
|
||||
auth_client_mixnet_listener.disconnect_mixnet_client().await;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Error waiting for auth clients mixnet listener to stop: {e}");
|
||||
}
|
||||
if let Err(e) = self.handle.await {
|
||||
tracing::error!("Error waiting for auth clients mixnet listener to stop: {e}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -765,7 +765,7 @@ async fn connect_exit(
|
||||
);
|
||||
// The IPR supports cancellation, but it's unused in the gateway probe
|
||||
let cancel_token = CancellationToken::new();
|
||||
let mut ipr_client = IprClientConnect::new(mixnet_client, cancel_token).await;
|
||||
let mut ipr_client = IprClientConnect::new(mixnet_client, cancel_token);
|
||||
|
||||
let maybe_ip_pair = ipr_client.connect(exit_router_address).await;
|
||||
let mixnet_client = ipr_client.into_mixnet_client();
|
||||
|
||||
@@ -43,7 +43,7 @@ pub struct IprClientConnect {
|
||||
}
|
||||
|
||||
impl IprClientConnect {
|
||||
pub async fn new(mixnet_client: MixnetClient, cancel_token: CancellationToken) -> Self {
|
||||
pub fn new(mixnet_client: MixnetClient, cancel_token: CancellationToken) -> Self {
|
||||
Self {
|
||||
mixnet_client,
|
||||
connected: ConnectionState::Disconnected,
|
||||
|
||||
@@ -9,6 +9,7 @@ use nym_credentials_interface::TicketType;
|
||||
use nym_ip_packet_client::IprClientConnect;
|
||||
use nym_registration_common::AssignedAddresses;
|
||||
use nym_sdk::mixnet::{EventReceiver, MixnetClient, Recipient};
|
||||
use tracing::debug;
|
||||
|
||||
use crate::config::RegistrationClientConfig;
|
||||
|
||||
@@ -34,23 +35,49 @@ pub struct RegistrationClient {
|
||||
event_rx: EventReceiver,
|
||||
}
|
||||
|
||||
// Bundle of an actual error and the underlying mixnet client so it can be shutdown correctly if needed
|
||||
struct RegistrationError {
|
||||
mixnet_client: Option<MixnetClient>,
|
||||
source: crate::RegistrationClientError,
|
||||
}
|
||||
|
||||
impl RegistrationClient {
|
||||
async fn register_mix_exit(self) -> Result<RegistrationResult, RegistrationClientError> {
|
||||
async fn register_mix_exit(self) -> Result<RegistrationResult, RegistrationError> {
|
||||
let entry_mixnet_gateway_ip = self.config.entry.node.ip_address;
|
||||
|
||||
let exit_mixnet_gateway_ip = self.config.exit.node.ip_address;
|
||||
|
||||
let ipr_address = self.config.exit.node.ipr_address.ok_or(
|
||||
RegistrationClientError::NoIpPacketRouterAddress {
|
||||
node_id: self.config.exit.node.identity.to_base58_string(),
|
||||
},
|
||||
)?;
|
||||
let Some(ipr_address) = self.config.exit.node.ipr_address else {
|
||||
return Err(RegistrationError {
|
||||
mixnet_client: Some(self.mixnet_client),
|
||||
source: RegistrationClientError::NoIpPacketRouterAddress {
|
||||
node_id: self.config.exit.node.identity.to_base58_string(),
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
let mut ipr_client =
|
||||
IprClientConnect::new(self.mixnet_client, self.cancel_token.clone()).await;
|
||||
let interface_addresses = ipr_client
|
||||
.connect(ipr_address)
|
||||
IprClientConnect::new(self.mixnet_client, self.cancel_token.child_token());
|
||||
|
||||
let interface_addresses = match self
|
||||
.cancel_token
|
||||
.run_until_cancelled(ipr_client.connect(ipr_address))
|
||||
.await
|
||||
.map_err(RegistrationClientError::ConnectToIpPacketRouter)?;
|
||||
{
|
||||
Some(Ok(addr)) => addr,
|
||||
Some(Err(e)) => {
|
||||
return Err(RegistrationError {
|
||||
mixnet_client: Some(ipr_client.into_mixnet_client()),
|
||||
source: RegistrationClientError::ConnectToIpPacketRouter(e),
|
||||
});
|
||||
}
|
||||
None => {
|
||||
return Err(RegistrationError {
|
||||
mixnet_client: Some(ipr_client.into_mixnet_client()),
|
||||
source: RegistrationClientError::Cancelled,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
Ok(RegistrationResult::Mixnet(Box::new(
|
||||
MixnetRegistrationResult {
|
||||
@@ -67,18 +94,24 @@ impl RegistrationClient {
|
||||
)))
|
||||
}
|
||||
|
||||
async fn register_wg(self) -> Result<RegistrationResult, RegistrationClientError> {
|
||||
let entry_auth_address = self.config.entry.node.authenticator_address.ok_or(
|
||||
RegistrationClientError::AuthenticationNotPossible {
|
||||
node_id: self.config.entry.node.identity.to_base58_string(),
|
||||
},
|
||||
)?;
|
||||
async fn register_wg(self) -> Result<RegistrationResult, RegistrationError> {
|
||||
let Some(entry_auth_address) = self.config.entry.node.authenticator_address else {
|
||||
return Err(RegistrationError {
|
||||
mixnet_client: Some(self.mixnet_client),
|
||||
source: RegistrationClientError::AuthenticationNotPossible {
|
||||
node_id: self.config.entry.node.identity.to_base58_string(),
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
let exit_auth_address = self.config.exit.node.authenticator_address.ok_or(
|
||||
RegistrationClientError::AuthenticationNotPossible {
|
||||
node_id: self.config.exit.node.identity.to_base58_string(),
|
||||
},
|
||||
)?;
|
||||
let Some(exit_auth_address) = self.config.exit.node.authenticator_address else {
|
||||
return Err(RegistrationError {
|
||||
mixnet_client: Some(self.mixnet_client),
|
||||
source: RegistrationClientError::AuthenticationNotPossible {
|
||||
node_id: self.config.exit.node.identity.to_base58_string(),
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
let entry_version = self.config.entry.node.version;
|
||||
tracing::debug!("Entry gateway version: {entry_version}");
|
||||
@@ -87,8 +120,10 @@ impl RegistrationClient {
|
||||
|
||||
// Start the auth client mixnet listener, which will listen for incoming messages from the
|
||||
// mixnet and rebroadcast them to the auth clients.
|
||||
// From this point on, we don't need to care about the mixnet client anymore
|
||||
let mixnet_listener =
|
||||
AuthClientMixnetListener::new(self.mixnet_client, self.cancel_token.clone()).start();
|
||||
AuthClientMixnetListener::new(self.mixnet_client, self.cancel_token.child_token())
|
||||
.start();
|
||||
|
||||
let mut entry_auth_client = AuthenticatorClient::new(
|
||||
mixnet_listener.subscribe(),
|
||||
@@ -115,24 +150,33 @@ impl RegistrationClient {
|
||||
let exit_fut = exit_auth_client
|
||||
.register_wireguard(&*self.bandwidth_controller, TicketType::V1WireguardExit);
|
||||
|
||||
let (entry, exit) = Box::pin(async { tokio::join!(entry_fut, exit_fut) }).await;
|
||||
let (entry, exit) = Box::pin(
|
||||
self.cancel_token
|
||||
.run_until_cancelled(async { tokio::join!(entry_fut, exit_fut) }),
|
||||
)
|
||||
.await
|
||||
.ok_or(RegistrationError {
|
||||
mixnet_client: None,
|
||||
source: RegistrationClientError::Cancelled,
|
||||
})?;
|
||||
|
||||
let entry =
|
||||
entry.map_err(
|
||||
|source| RegistrationClientError::EntryGatewayRegisterWireguard {
|
||||
gateway_id: self.config.entry.node.identity.to_base58_string(),
|
||||
authenticator_address: Box::new(entry_auth_address),
|
||||
source: Box::new(source),
|
||||
},
|
||||
)?;
|
||||
let exit =
|
||||
exit.map_err(
|
||||
|source| RegistrationClientError::ExitGatewayRegisterWireguard {
|
||||
gateway_id: self.config.exit.node.identity.to_base58_string(),
|
||||
authenticator_address: Box::new(exit_auth_address),
|
||||
source: Box::new(source),
|
||||
},
|
||||
)?;
|
||||
let entry = entry.map_err(|source| RegistrationError {
|
||||
mixnet_client: None,
|
||||
source: RegistrationClientError::EntryGatewayRegisterWireguard {
|
||||
gateway_id: self.config.entry.node.identity.to_base58_string(),
|
||||
authenticator_address: Box::new(entry_auth_address),
|
||||
source: Box::new(source),
|
||||
},
|
||||
})?;
|
||||
|
||||
let exit = exit.map_err(|source| RegistrationError {
|
||||
mixnet_client: None,
|
||||
source: RegistrationClientError::EntryGatewayRegisterWireguard {
|
||||
gateway_id: self.config.exit.node.identity.to_base58_string(),
|
||||
authenticator_address: Box::new(exit_auth_address),
|
||||
source: Box::new(source),
|
||||
},
|
||||
})?;
|
||||
|
||||
Ok(RegistrationResult::Wireguard(Box::new(
|
||||
WireguardRegistrationResult {
|
||||
@@ -147,16 +191,23 @@ impl RegistrationClient {
|
||||
}
|
||||
|
||||
pub async fn register(self) -> Result<RegistrationResult, RegistrationClientError> {
|
||||
self.cancel_token
|
||||
.clone()
|
||||
.run_until_cancelled(async {
|
||||
if self.config.two_hops {
|
||||
self.register_wg().await
|
||||
} else {
|
||||
self.register_mix_exit().await
|
||||
let registration_result = if self.config.two_hops {
|
||||
self.register_wg().await
|
||||
} else {
|
||||
self.register_mix_exit().await
|
||||
};
|
||||
|
||||
// If we failed to register, and we were the owner of the mixnet client, shut it down
|
||||
match registration_result {
|
||||
Ok(result) => Ok(result),
|
||||
Err(error) => {
|
||||
debug!("Registration failed");
|
||||
if let Some(mixnet_client) = error.mixnet_client {
|
||||
debug!("Shutting down mixnet client");
|
||||
mixnet_client.disconnect().await;
|
||||
}
|
||||
})
|
||||
.await
|
||||
.ok_or(RegistrationClientError::Cancelled)?
|
||||
Err(error.source)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user