Compare commits

...

8 Commits

Author SHA1 Message Date
Jon Häggblad 33e69f9eb9 Tidy up 2023-08-22 10:30:12 +02:00
Jon Häggblad 0b7b10e591 wip: work on tidy up 2023-08-21 18:01:26 +02:00
Jon Häggblad 1345a78bf7 wip: simplify 2023-08-21 16:34:17 +02:00
Jon Häggblad 040877ae69 remove on drop 2023-08-21 12:22:55 +02:00
Jon Häggblad a2995bbc8f WIP: working solution 2023-08-21 12:19:55 +02:00
Jon Häggblad 958cdfd22c wip 2023-08-18 16:46:05 +02:00
Jon Häggblad 9820ad3bea wip: unconditionally disconnect existing client 2023-08-18 12:42:39 +02:00
Jon Häggblad 19e26f55ea wip 2023-08-17 15:26:13 +02:00
13 changed files with 188 additions and 17 deletions
Generated
+1
View File
@@ -3821,6 +3821,7 @@ dependencies = [
"nym-explorer-api-requests",
"nym-gateway-client",
"nym-gateway-requests",
"nym-mixnet-contract-common",
"nym-network-defaults",
"nym-nonexhaustive-delayqueue",
"nym-pemstore",
+1
View File
@@ -164,6 +164,7 @@ pub(crate) async fn execute(args: &Init) -> Result<(), ClientError> {
user_chosen_gateway_id.map(|id| id.to_base58_string()),
Some(args.latency_based_selection),
);
dbg!(&gateway_setup);
// Load and potentially override config
let config = override_config(Config::new(id), OverrideConfig::from(args.clone()));
+2
View File
@@ -45,6 +45,8 @@ nym-task = { path = "../task" }
nym-credential-storage = { path = "../credential-storage" }
nym-network-defaults = { path = "../network-defaults" }
nym-mixnet-contract-common = { path = "../cosmwasm-smart-contracts/mixnet-contract" }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-stream]
version = "0.1.11"
features = ["time"]
@@ -74,8 +74,22 @@ impl NymApiTopologyProvider {
error!("failed to get network gateways - {err}");
return None;
}
Ok(gateways) => gateways,
Ok(gateways) => {
// dbg!(&gateways);
gateways
}
};
let mut g = gateways[0].clone();
g.gateway = nym_mixnet_contract_common::Gateway {
host: "127.0.0.1".to_string(),
mix_port: 1789,
clients_port: 9000,
location: "local".to_string(),
sphinx_key: "9PgtqBP8Xo3icVvvgrtxWKcNEFGnvDAXd7tWHmpA5UPR".to_string(),
identity_key: "GapWkU8o3goXH5sjKw7TWGE3NwLKq7gBqzdD77qahC28".to_string(),
version: "1.1.25".to_string(),
};
let gateways = vec![g];
let topology = nym_topology_from_detailed(mixnodes, gateways)
.filter_system_version(&self.client_version);
+12
View File
@@ -68,6 +68,18 @@ pub async fn current_gateways<R: Rng>(
log::trace!("Fetching list of gateways from: {nym_api}");
let gateways = client.get_cached_gateways().await?;
// dbg!(&gateways);
let mut g = gateways[0].clone();
g.gateway = nym_mixnet_contract_common::Gateway {
host: "127.0.0.1".to_string(),
mix_port: 1789,
clients_port: 9000,
location: "local".to_string(),
sphinx_key: "9PgtqBP8Xo3icVvvgrtxWKcNEFGnvDAXd7tWHmpA5UPR".to_string(),
identity_key: "GapWkU8o3goXH5sjKw7TWGE3NwLKq7gBqzdD77qahC28".to_string(),
version: "1.1.25".to_string(),
};
let gateways = vec![g];
let valid_gateways = gateways
.into_iter()
.filter_map(|gateway| gateway.try_into().ok())
+9
View File
@@ -97,6 +97,7 @@ impl InitialisationDetails {
}
}
#[derive(Debug)]
pub enum GatewaySetup {
/// The gateway specification MUST BE loaded from the underlying storage.
MustLoad,
@@ -307,6 +308,7 @@ where
K::StorageError: Send + Sync + 'static,
D::StorageError: Send + Sync + 'static,
{
dbg!(&overwrite_data);
// I don't like how we can't deal with this variant in the match below, but we need to take ownership of internal values.
if let GatewaySetup::ReuseConnection {
authenticated_ephemeral_client,
@@ -331,6 +333,7 @@ where
Ok(loaded_keys) => {
match &setup {
GatewaySetup::MustLoad => {
println!("GatewaySetup::MustLoad");
// get EVERYTHING from the storage
let details = loaded_details?;
ensure_valid_details(&details, &loaded_keys)?;
@@ -339,6 +342,7 @@ where
return Ok(InitialisationDetails::new(details.into(), loaded_keys).into());
}
GatewaySetup::Predefined { details } => {
println!("GatewaySetup::Predefined");
// we already have defined gateway details AND a shared key
ensure_valid_details(details, &loaded_keys)?;
@@ -352,6 +356,7 @@ where
);
}
GatewaySetup::Specified { gateway_identity } => {
println!("GatewaySetup::Specified");
// if that data was already stored...
if let Ok(existing_gateway) = loaded_details {
ensure_valid_details(&existing_gateway, &loaded_keys)?;
@@ -379,7 +384,9 @@ where
}
}
GatewaySetup::New { .. } => {
println!("GatewaySetup::New");
if let Ok(existing_gateway) = loaded_details {
println!("GatewaySetup::New - existing_gateway");
ensure_valid_details(&existing_gateway, &loaded_keys)?;
return Ok(InitialisationDetails::new(
existing_gateway.into(),
@@ -391,6 +398,7 @@ where
// we didn't get full details from the store and we have loaded some keys
// so we can only continue if we're allowed to overwrite keys
if overwrite_data {
println!("GatewaySetup::New - overwrite_data");
ManagedKeys::generate_new(&mut rng)
} else {
return Err(ClientCoreError::ForbiddenKeyOverwrite);
@@ -456,6 +464,7 @@ where
{
let mut rng = OsRng;
let gateways = current_gateways(&mut rng, validator_servers.unwrap_or_default()).await?;
dbg!(&gateways);
setup_gateway_from(
setup,
@@ -25,6 +25,7 @@ use nym_task::TaskClient;
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
use rand::rngs::OsRng;
use std::convert::TryFrom;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use tungstenite::protocol::Message;
@@ -68,6 +69,31 @@ pub struct GatewayClient<C, St = EphemeralCredentialStorage> {
shutdown: TaskClient,
}
impl<C, St> fmt::Debug for GatewayClient<C, St> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("GatewayClient")
.field("authenticated", &self.authenticated)
.field("disabled_credentials_mode", &self.disabled_credentials_mode)
.field("bandwidth_remaining", &self.bandwidth_remaining)
.field("gateway_address", &self.gateway_address)
.field("gateway_identity", &self.gateway_identity)
.field("local_identity", &self.local_identity)
.field("shared_key", &self.shared_key)
// .field("connection", &self.connection)
.field("packet_router", &self.packet_router)
.field("response_timeout_duration", &self.response_timeout_duration)
// .field("bandwidth_controller", &self.bandwidth_controller)
.field(
"should_reconnect_on_failure",
&self.should_reconnect_on_failure,
)
.field("reconnection_attempts", &self.reconnection_attempts)
.field("reconnection_backoff", &self.reconnection_backoff)
.field("shutdown", &self.shutdown)
.finish()
}
}
impl<C, St> GatewayClient<C, St> {
// TODO: put it all in a Config struct
#[allow(clippy::too_many_arguments)]
@@ -1,13 +1,17 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::node::client_handling::websocket::message_receiver::MixMessageSender;
use dashmap::DashMap;
use futures::channel::{mpsc, oneshot};
use nym_sphinx::DestinationAddressBytes;
use std::sync::Arc;
use super::websocket::message_receiver::{IsActiveRequestSender, MixMessageSender};
#[derive(Clone)]
pub(crate) struct ActiveClientsStore(Arc<DashMap<DestinationAddressBytes, MixMessageSender>>);
pub(crate) struct ActiveClientsStore(
Arc<DashMap<DestinationAddressBytes, (MixMessageSender, IsActiveRequestSender)>>,
);
impl ActiveClientsStore {
/// Creates new instance of `ActiveClientsStore` to store in-memory handles to all currently connected clients.
@@ -21,13 +25,16 @@ impl ActiveClientsStore {
/// # Arguments
///
/// * `client`: address of the client for which to obtain the handle.
pub(crate) fn get(&self, client: DestinationAddressBytes) -> Option<MixMessageSender> {
pub(crate) fn get(
&self,
client: DestinationAddressBytes,
) -> Option<(MixMessageSender, IsActiveRequestSender)> {
let entry = self.0.get(&client)?;
let handle = entry.value();
// if the entry is stale, remove it from the map
// if handle.is_valid() {
if !handle.is_closed() {
if !handle.0.is_closed() {
Some(handle.clone())
} else {
// drop the reference to the map to prevent deadlocks
@@ -52,8 +59,13 @@ impl ActiveClientsStore {
///
/// * `client`: address of the client for which to insert the handle.
/// * `handle`: the sender channel for all mix packets to be pushed back onto the websocket
pub(crate) fn insert(&self, client: DestinationAddressBytes, handle: MixMessageSender) {
self.0.insert(client, handle);
pub(crate) fn insert(
&self,
client: DestinationAddressBytes,
handle: MixMessageSender,
is_active_sender: mpsc::UnboundedSender<oneshot::Sender<bool>>,
) {
self.0.insert(client, (handle, is_active_sender));
}
/// Get number of active clients in store
@@ -2,7 +2,9 @@
// SPDX-License-Identifier: Apache-2.0
use crate::node::client_handling::websocket::connection_handler::{ClientDetails, FreshHandler};
use crate::node::client_handling::websocket::message_receiver::MixMessageReceiver;
use crate::node::client_handling::websocket::message_receiver::{
IsActiveRequestReceiver, MixMessageReceiver,
};
use crate::node::storage::error::StorageError;
use crate::node::storage::Storage;
use futures::StreamExt;
@@ -97,6 +99,7 @@ pub(crate) struct AuthenticatedHandler<R, S, St> {
inner: FreshHandler<R, S, St>,
client: ClientDetails,
mix_receiver: MixMessageReceiver,
is_active_request_receiver: IsActiveRequestReceiver,
}
// explicitly remove handle from the global store upon being dropped
@@ -127,11 +130,13 @@ where
fresh: FreshHandler<R, S, St>,
client: ClientDetails,
mix_receiver: MixMessageReceiver,
is_active_request_receiver: IsActiveRequestReceiver,
) -> Self {
AuthenticatedHandler {
inner: fresh,
client,
mix_receiver,
is_active_request_receiver,
}
}
@@ -351,18 +356,38 @@ where
}
}
/// Handles pong message received from the client.
/// If the client is still active, the handler that requested the ping will receive a reply.
async fn handle_pong(&mut self, msg: Vec<u8>) {
if let Ok(msg) = msg.try_into() {
let msg = u64::from_be_bytes(msg);
debug!("Received pong from client: {}", msg);
if let Some(tx) = self.inner.is_active_ping_pending_replies.remove(&msg) {
debug!("Reporting back");
if let Err(err) = tx.send(true) {
warn!("Failed to send pong reply back to the requesting handler: {err}");
}
}
}
}
/// Attempts to handle websocket message received from the connected client.
///
/// # Arguments
///
/// * `raw_request`: raw received websocket message.
async fn handle_request(&mut self, raw_request: Message) -> Option<Message> {
println!("handle_request");
// apparently tungstenite auto-handles ping/pong/close messages so for now let's ignore
// them and let's test that claim. If that's not the case, just copy code from
// desktop nym-client websocket as I've manually handled everything there
match raw_request {
Message::Binary(bin_msg) => Some(self.handle_binary(bin_msg).await),
Message::Text(text_msg) => Some(self.handle_text(text_msg).await),
Message::Pong(msg) => {
self.handle_pong(msg).await;
None
}
_ => None,
}
}
@@ -381,8 +406,23 @@ where
tokio::select! {
_ = shutdown.recv() => {
log::trace!("client_handling::AuthenticatedHandler: received shutdown");
}
},
tx = self.is_active_request_receiver.next() => {
match tx {
None => debug!("is_active_request_receiver was closed!"),
Some(reply_tx) => {
let tag: u64 = rand::thread_rng().gen();
debug!("Got request to ping our connection: {}", tag);
if let Err(err) = self.inner.send_websocket_message(Message::Ping(tag.to_be_bytes().to_vec())).await {
warn!("Failed to send ping to client: {err}. Assuming the connection is dead.");
break;
}
self.inner.is_active_ping_pending_replies.insert(tag, reply_tx);
}
};
},
socket_msg = self.inner.read_websocket_message() => {
println!("socket_msg");
let socket_msg = match socket_msg {
None => break,
Some(Ok(socket_msg)) => socket_msg,
@@ -6,8 +6,10 @@ use crate::node::client_handling::websocket::connection_handler::coconut::Coconu
use crate::node::client_handling::websocket::connection_handler::{
AuthenticatedHandler, ClientDetails, InitialAuthResult, SocketStream,
};
use crate::node::client_handling::websocket::message_receiver::IsActiveResultSender;
use crate::node::storage::error::StorageError;
use crate::node::storage::Storage;
use futures::channel::oneshot;
use futures::{channel::mpsc, SinkExt, StreamExt};
use log::*;
use nym_crypto::asymmetric::identity;
@@ -22,8 +24,10 @@ use nym_gateway_requests::{BinaryResponse, PROTOCOL_VERSION};
use nym_mixnet_client::forwarder::MixForwardingSender;
use nym_sphinx::DestinationAddressBytes;
use rand::{CryptoRng, Rng};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_tungstenite::tungstenite::{protocol::Message, Error as WsError};
@@ -75,6 +79,10 @@ pub(crate) struct FreshHandler<R, S, St> {
pub(crate) socket_connection: SocketStream<S>,
pub(crate) storage: St,
pub(crate) coconut_verifier: Arc<CoconutVerifier>,
// Occasionally the handler is requested to ping the connected client for confirm that it's
// active, such as when a duplicate connection is detected. This hashmap stores the oneshot
// senders that are used to return the result of the ping to the handler requesting the ping.
pub(crate) is_active_ping_pending_replies: HashMap<u64, IsActiveResultSender>,
}
impl<R, S, St> FreshHandler<R, S, St>
@@ -106,6 +114,7 @@ where
local_identity,
storage,
coconut_verifier,
is_active_ping_pending_replies: HashMap::new(),
}
}
@@ -418,8 +427,39 @@ where
let encrypted_address = EncryptedAddressBytes::try_from_base58_string(enc_address)?;
let iv = IV::try_from_base58_string(iv)?;
if self.active_clients_store.get(address).is_some() {
return Err(InitialAuthenticationError::DuplicateConnection);
if let Some((__, mut is_active_request_tx)) = self.active_clients_store.get(address) {
log::warn!("Detected duplicate connection for client: {}", address);
// Ask the other connection to ping if they are still active.
// Use a oneshot channel to return the result to us
let (ping_result_sender, ping_result_receiver) = oneshot::channel::<bool>();
log::debug!("Asking other connection to ping the connection client");
is_active_request_tx.send(ping_result_sender).await.ok();
// Wait for the reply
match tokio::time::timeout(Duration::from_millis(1000), ping_result_receiver).await {
Ok(Ok(res)) => {
if res {
// The other handled reported a positive reply, so we have to assume it's
// still active and disconnect this connection.
log::info!("Other handler reports it is active");
return Err(InitialAuthenticationError::DuplicateConnection);
} else {
log::debug!("Other handler reports it is not active");
self.active_clients_store.disconnect(address);
}
}
Ok(Err(_)) => {
// Other channel failed to reply (the channel sender probably dropped)
log::info!("Other connection failed to reply, disconnecting it in favour of this new connection");
self.active_clients_store.disconnect(address);
}
Err(_) => {
// Timeout waiting for reply
log::warn!("Other connection timed out, disconnecting it in favour of this new connection");
self.active_clients_store.disconnect(address);
}
}
}
let shared_keys = self
@@ -504,7 +544,8 @@ where
let remote_address = remote_identity.derive_destination_address();
if self.active_clients_store.get(remote_address).is_some() {
return Err(InitialAuthenticationError::DuplicateConnection);
println!("duplicate connection when registering");
// return Err(InitialAuthenticationError::DuplicateConnection);
}
let shared_keys = self.perform_registration_handshake(init_data).await?;
@@ -606,12 +647,19 @@ where
}
return if let Some(client_details) = auth_result.client_details {
self.active_clients_store
.insert(client_details.address, mix_sender);
// Channel for handlers to ask other handlers if they are still active.
let (is_active_request_sender, is_active_request_receiver) =
mpsc::unbounded();
self.active_clients_store.insert(
client_details.address,
mix_sender,
is_active_request_sender,
);
Some(AuthenticatedHandler::upgrade(
self,
client_details,
mix_receiver,
is_active_request_receiver,
))
} else {
None
@@ -100,7 +100,7 @@ pub(crate) async fn handle_connection<R, S, St>(
.await
{
None => {
trace!("received shutdown signal while performing initial authetnication");
trace!("received shutdown signal while performing initial authentication");
return;
}
Some(None) => {
@@ -1,7 +1,13 @@
// Copyright 2020 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use futures::channel::mpsc;
use futures::channel::{mpsc, oneshot};
pub(crate) type MixMessageSender = mpsc::UnboundedSender<Vec<Vec<u8>>>;
pub(crate) type MixMessageReceiver = mpsc::UnboundedReceiver<Vec<Vec<u8>>>;
// Channels used for one handler to requester another handler to check that the client is still
// active. The result is then passed back to the requesting handler in the oneshot channel.
pub(crate) type IsActiveRequestSender = mpsc::UnboundedSender<IsActiveResultSender>;
pub(crate) type IsActiveRequestReceiver = mpsc::UnboundedReceiver<IsActiveResultSender>;
pub(crate) type IsActiveResultSender = oneshot::Sender<bool>;
@@ -70,7 +70,7 @@ impl<St: Storage> ConnectionHandler<St> {
}
fn update_clients_store_cache_entry(&mut self, client_address: DestinationAddressBytes) {
if let Some(client_sender) = self.active_clients_store.get(client_address) {
if let Some((client_sender, _)) = self.active_clients_store.get(client_address) {
self.clients_store_cache
.insert(client_address, client_sender);
}