Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 33e69f9eb9 | |||
| 0b7b10e591 | |||
| 1345a78bf7 | |||
| 040877ae69 | |||
| a2995bbc8f | |||
| 958cdfd22c | |||
| 9820ad3bea | |||
| 19e26f55ea |
Generated
+1
@@ -3821,6 +3821,7 @@ dependencies = [
|
||||
"nym-explorer-api-requests",
|
||||
"nym-gateway-client",
|
||||
"nym-gateway-requests",
|
||||
"nym-mixnet-contract-common",
|
||||
"nym-network-defaults",
|
||||
"nym-nonexhaustive-delayqueue",
|
||||
"nym-pemstore",
|
||||
|
||||
@@ -164,6 +164,7 @@ pub(crate) async fn execute(args: &Init) -> Result<(), ClientError> {
|
||||
user_chosen_gateway_id.map(|id| id.to_base58_string()),
|
||||
Some(args.latency_based_selection),
|
||||
);
|
||||
dbg!(&gateway_setup);
|
||||
|
||||
// Load and potentially override config
|
||||
let config = override_config(Config::new(id), OverrideConfig::from(args.clone()));
|
||||
|
||||
@@ -45,6 +45,8 @@ nym-task = { path = "../task" }
|
||||
nym-credential-storage = { path = "../credential-storage" }
|
||||
nym-network-defaults = { path = "../network-defaults" }
|
||||
|
||||
nym-mixnet-contract-common = { path = "../cosmwasm-smart-contracts/mixnet-contract" }
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-stream]
|
||||
version = "0.1.11"
|
||||
features = ["time"]
|
||||
|
||||
@@ -74,8 +74,22 @@ impl NymApiTopologyProvider {
|
||||
error!("failed to get network gateways - {err}");
|
||||
return None;
|
||||
}
|
||||
Ok(gateways) => gateways,
|
||||
Ok(gateways) => {
|
||||
// dbg!(&gateways);
|
||||
gateways
|
||||
}
|
||||
};
|
||||
let mut g = gateways[0].clone();
|
||||
g.gateway = nym_mixnet_contract_common::Gateway {
|
||||
host: "127.0.0.1".to_string(),
|
||||
mix_port: 1789,
|
||||
clients_port: 9000,
|
||||
location: "local".to_string(),
|
||||
sphinx_key: "9PgtqBP8Xo3icVvvgrtxWKcNEFGnvDAXd7tWHmpA5UPR".to_string(),
|
||||
identity_key: "GapWkU8o3goXH5sjKw7TWGE3NwLKq7gBqzdD77qahC28".to_string(),
|
||||
version: "1.1.25".to_string(),
|
||||
};
|
||||
let gateways = vec![g];
|
||||
|
||||
let topology = nym_topology_from_detailed(mixnodes, gateways)
|
||||
.filter_system_version(&self.client_version);
|
||||
|
||||
@@ -68,6 +68,18 @@ pub async fn current_gateways<R: Rng>(
|
||||
log::trace!("Fetching list of gateways from: {nym_api}");
|
||||
|
||||
let gateways = client.get_cached_gateways().await?;
|
||||
// dbg!(&gateways);
|
||||
let mut g = gateways[0].clone();
|
||||
g.gateway = nym_mixnet_contract_common::Gateway {
|
||||
host: "127.0.0.1".to_string(),
|
||||
mix_port: 1789,
|
||||
clients_port: 9000,
|
||||
location: "local".to_string(),
|
||||
sphinx_key: "9PgtqBP8Xo3icVvvgrtxWKcNEFGnvDAXd7tWHmpA5UPR".to_string(),
|
||||
identity_key: "GapWkU8o3goXH5sjKw7TWGE3NwLKq7gBqzdD77qahC28".to_string(),
|
||||
version: "1.1.25".to_string(),
|
||||
};
|
||||
let gateways = vec![g];
|
||||
let valid_gateways = gateways
|
||||
.into_iter()
|
||||
.filter_map(|gateway| gateway.try_into().ok())
|
||||
|
||||
@@ -97,6 +97,7 @@ impl InitialisationDetails {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum GatewaySetup {
|
||||
/// The gateway specification MUST BE loaded from the underlying storage.
|
||||
MustLoad,
|
||||
@@ -307,6 +308,7 @@ where
|
||||
K::StorageError: Send + Sync + 'static,
|
||||
D::StorageError: Send + Sync + 'static,
|
||||
{
|
||||
dbg!(&overwrite_data);
|
||||
// I don't like how we can't deal with this variant in the match below, but we need to take ownership of internal values.
|
||||
if let GatewaySetup::ReuseConnection {
|
||||
authenticated_ephemeral_client,
|
||||
@@ -331,6 +333,7 @@ where
|
||||
Ok(loaded_keys) => {
|
||||
match &setup {
|
||||
GatewaySetup::MustLoad => {
|
||||
println!("GatewaySetup::MustLoad");
|
||||
// get EVERYTHING from the storage
|
||||
let details = loaded_details?;
|
||||
ensure_valid_details(&details, &loaded_keys)?;
|
||||
@@ -339,6 +342,7 @@ where
|
||||
return Ok(InitialisationDetails::new(details.into(), loaded_keys).into());
|
||||
}
|
||||
GatewaySetup::Predefined { details } => {
|
||||
println!("GatewaySetup::Predefined");
|
||||
// we already have defined gateway details AND a shared key
|
||||
ensure_valid_details(details, &loaded_keys)?;
|
||||
|
||||
@@ -352,6 +356,7 @@ where
|
||||
);
|
||||
}
|
||||
GatewaySetup::Specified { gateway_identity } => {
|
||||
println!("GatewaySetup::Specified");
|
||||
// if that data was already stored...
|
||||
if let Ok(existing_gateway) = loaded_details {
|
||||
ensure_valid_details(&existing_gateway, &loaded_keys)?;
|
||||
@@ -379,7 +384,9 @@ where
|
||||
}
|
||||
}
|
||||
GatewaySetup::New { .. } => {
|
||||
println!("GatewaySetup::New");
|
||||
if let Ok(existing_gateway) = loaded_details {
|
||||
println!("GatewaySetup::New - existing_gateway");
|
||||
ensure_valid_details(&existing_gateway, &loaded_keys)?;
|
||||
return Ok(InitialisationDetails::new(
|
||||
existing_gateway.into(),
|
||||
@@ -391,6 +398,7 @@ where
|
||||
// we didn't get full details from the store and we have loaded some keys
|
||||
// so we can only continue if we're allowed to overwrite keys
|
||||
if overwrite_data {
|
||||
println!("GatewaySetup::New - overwrite_data");
|
||||
ManagedKeys::generate_new(&mut rng)
|
||||
} else {
|
||||
return Err(ClientCoreError::ForbiddenKeyOverwrite);
|
||||
@@ -456,6 +464,7 @@ where
|
||||
{
|
||||
let mut rng = OsRng;
|
||||
let gateways = current_gateways(&mut rng, validator_servers.unwrap_or_default()).await?;
|
||||
dbg!(&gateways);
|
||||
|
||||
setup_gateway_from(
|
||||
setup,
|
||||
|
||||
@@ -25,6 +25,7 @@ use nym_task::TaskClient;
|
||||
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
|
||||
use rand::rngs::OsRng;
|
||||
use std::convert::TryFrom;
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tungstenite::protocol::Message;
|
||||
@@ -68,6 +69,31 @@ pub struct GatewayClient<C, St = EphemeralCredentialStorage> {
|
||||
shutdown: TaskClient,
|
||||
}
|
||||
|
||||
impl<C, St> fmt::Debug for GatewayClient<C, St> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("GatewayClient")
|
||||
.field("authenticated", &self.authenticated)
|
||||
.field("disabled_credentials_mode", &self.disabled_credentials_mode)
|
||||
.field("bandwidth_remaining", &self.bandwidth_remaining)
|
||||
.field("gateway_address", &self.gateway_address)
|
||||
.field("gateway_identity", &self.gateway_identity)
|
||||
.field("local_identity", &self.local_identity)
|
||||
.field("shared_key", &self.shared_key)
|
||||
// .field("connection", &self.connection)
|
||||
.field("packet_router", &self.packet_router)
|
||||
.field("response_timeout_duration", &self.response_timeout_duration)
|
||||
// .field("bandwidth_controller", &self.bandwidth_controller)
|
||||
.field(
|
||||
"should_reconnect_on_failure",
|
||||
&self.should_reconnect_on_failure,
|
||||
)
|
||||
.field("reconnection_attempts", &self.reconnection_attempts)
|
||||
.field("reconnection_backoff", &self.reconnection_backoff)
|
||||
.field("shutdown", &self.shutdown)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<C, St> GatewayClient<C, St> {
|
||||
// TODO: put it all in a Config struct
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
|
||||
@@ -1,13 +1,17 @@
|
||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::node::client_handling::websocket::message_receiver::MixMessageSender;
|
||||
use dashmap::DashMap;
|
||||
use futures::channel::{mpsc, oneshot};
|
||||
use nym_sphinx::DestinationAddressBytes;
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::websocket::message_receiver::{IsActiveRequestSender, MixMessageSender};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ActiveClientsStore(Arc<DashMap<DestinationAddressBytes, MixMessageSender>>);
|
||||
pub(crate) struct ActiveClientsStore(
|
||||
Arc<DashMap<DestinationAddressBytes, (MixMessageSender, IsActiveRequestSender)>>,
|
||||
);
|
||||
|
||||
impl ActiveClientsStore {
|
||||
/// Creates new instance of `ActiveClientsStore` to store in-memory handles to all currently connected clients.
|
||||
@@ -21,13 +25,16 @@ impl ActiveClientsStore {
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `client`: address of the client for which to obtain the handle.
|
||||
pub(crate) fn get(&self, client: DestinationAddressBytes) -> Option<MixMessageSender> {
|
||||
pub(crate) fn get(
|
||||
&self,
|
||||
client: DestinationAddressBytes,
|
||||
) -> Option<(MixMessageSender, IsActiveRequestSender)> {
|
||||
let entry = self.0.get(&client)?;
|
||||
let handle = entry.value();
|
||||
|
||||
// if the entry is stale, remove it from the map
|
||||
// if handle.is_valid() {
|
||||
if !handle.is_closed() {
|
||||
if !handle.0.is_closed() {
|
||||
Some(handle.clone())
|
||||
} else {
|
||||
// drop the reference to the map to prevent deadlocks
|
||||
@@ -52,8 +59,13 @@ impl ActiveClientsStore {
|
||||
///
|
||||
/// * `client`: address of the client for which to insert the handle.
|
||||
/// * `handle`: the sender channel for all mix packets to be pushed back onto the websocket
|
||||
pub(crate) fn insert(&self, client: DestinationAddressBytes, handle: MixMessageSender) {
|
||||
self.0.insert(client, handle);
|
||||
pub(crate) fn insert(
|
||||
&self,
|
||||
client: DestinationAddressBytes,
|
||||
handle: MixMessageSender,
|
||||
is_active_sender: mpsc::UnboundedSender<oneshot::Sender<bool>>,
|
||||
) {
|
||||
self.0.insert(client, (handle, is_active_sender));
|
||||
}
|
||||
|
||||
/// Get number of active clients in store
|
||||
|
||||
@@ -2,7 +2,9 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::node::client_handling::websocket::connection_handler::{ClientDetails, FreshHandler};
|
||||
use crate::node::client_handling::websocket::message_receiver::MixMessageReceiver;
|
||||
use crate::node::client_handling::websocket::message_receiver::{
|
||||
IsActiveRequestReceiver, MixMessageReceiver,
|
||||
};
|
||||
use crate::node::storage::error::StorageError;
|
||||
use crate::node::storage::Storage;
|
||||
use futures::StreamExt;
|
||||
@@ -97,6 +99,7 @@ pub(crate) struct AuthenticatedHandler<R, S, St> {
|
||||
inner: FreshHandler<R, S, St>,
|
||||
client: ClientDetails,
|
||||
mix_receiver: MixMessageReceiver,
|
||||
is_active_request_receiver: IsActiveRequestReceiver,
|
||||
}
|
||||
|
||||
// explicitly remove handle from the global store upon being dropped
|
||||
@@ -127,11 +130,13 @@ where
|
||||
fresh: FreshHandler<R, S, St>,
|
||||
client: ClientDetails,
|
||||
mix_receiver: MixMessageReceiver,
|
||||
is_active_request_receiver: IsActiveRequestReceiver,
|
||||
) -> Self {
|
||||
AuthenticatedHandler {
|
||||
inner: fresh,
|
||||
client,
|
||||
mix_receiver,
|
||||
is_active_request_receiver,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -351,18 +356,38 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles pong message received from the client.
|
||||
/// If the client is still active, the handler that requested the ping will receive a reply.
|
||||
async fn handle_pong(&mut self, msg: Vec<u8>) {
|
||||
if let Ok(msg) = msg.try_into() {
|
||||
let msg = u64::from_be_bytes(msg);
|
||||
debug!("Received pong from client: {}", msg);
|
||||
if let Some(tx) = self.inner.is_active_ping_pending_replies.remove(&msg) {
|
||||
debug!("Reporting back");
|
||||
if let Err(err) = tx.send(true) {
|
||||
warn!("Failed to send pong reply back to the requesting handler: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to handle websocket message received from the connected client.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `raw_request`: raw received websocket message.
|
||||
async fn handle_request(&mut self, raw_request: Message) -> Option<Message> {
|
||||
println!("handle_request");
|
||||
// apparently tungstenite auto-handles ping/pong/close messages so for now let's ignore
|
||||
// them and let's test that claim. If that's not the case, just copy code from
|
||||
// desktop nym-client websocket as I've manually handled everything there
|
||||
match raw_request {
|
||||
Message::Binary(bin_msg) => Some(self.handle_binary(bin_msg).await),
|
||||
Message::Text(text_msg) => Some(self.handle_text(text_msg).await),
|
||||
Message::Pong(msg) => {
|
||||
self.handle_pong(msg).await;
|
||||
None
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
@@ -381,8 +406,23 @@ where
|
||||
tokio::select! {
|
||||
_ = shutdown.recv() => {
|
||||
log::trace!("client_handling::AuthenticatedHandler: received shutdown");
|
||||
}
|
||||
},
|
||||
tx = self.is_active_request_receiver.next() => {
|
||||
match tx {
|
||||
None => debug!("is_active_request_receiver was closed!"),
|
||||
Some(reply_tx) => {
|
||||
let tag: u64 = rand::thread_rng().gen();
|
||||
debug!("Got request to ping our connection: {}", tag);
|
||||
if let Err(err) = self.inner.send_websocket_message(Message::Ping(tag.to_be_bytes().to_vec())).await {
|
||||
warn!("Failed to send ping to client: {err}. Assuming the connection is dead.");
|
||||
break;
|
||||
}
|
||||
self.inner.is_active_ping_pending_replies.insert(tag, reply_tx);
|
||||
}
|
||||
};
|
||||
},
|
||||
socket_msg = self.inner.read_websocket_message() => {
|
||||
println!("socket_msg");
|
||||
let socket_msg = match socket_msg {
|
||||
None => break,
|
||||
Some(Ok(socket_msg)) => socket_msg,
|
||||
|
||||
@@ -6,8 +6,10 @@ use crate::node::client_handling::websocket::connection_handler::coconut::Coconu
|
||||
use crate::node::client_handling::websocket::connection_handler::{
|
||||
AuthenticatedHandler, ClientDetails, InitialAuthResult, SocketStream,
|
||||
};
|
||||
use crate::node::client_handling::websocket::message_receiver::IsActiveResultSender;
|
||||
use crate::node::storage::error::StorageError;
|
||||
use crate::node::storage::Storage;
|
||||
use futures::channel::oneshot;
|
||||
use futures::{channel::mpsc, SinkExt, StreamExt};
|
||||
use log::*;
|
||||
use nym_crypto::asymmetric::identity;
|
||||
@@ -22,8 +24,10 @@ use nym_gateway_requests::{BinaryResponse, PROTOCOL_VERSION};
|
||||
use nym_mixnet_client::forwarder::MixForwardingSender;
|
||||
use nym_sphinx::DestinationAddressBytes;
|
||||
use rand::{CryptoRng, Rng};
|
||||
use std::collections::HashMap;
|
||||
use std::convert::TryFrom;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use thiserror::Error;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_tungstenite::tungstenite::{protocol::Message, Error as WsError};
|
||||
@@ -75,6 +79,10 @@ pub(crate) struct FreshHandler<R, S, St> {
|
||||
pub(crate) socket_connection: SocketStream<S>,
|
||||
pub(crate) storage: St,
|
||||
pub(crate) coconut_verifier: Arc<CoconutVerifier>,
|
||||
// Occasionally the handler is requested to ping the connected client for confirm that it's
|
||||
// active, such as when a duplicate connection is detected. This hashmap stores the oneshot
|
||||
// senders that are used to return the result of the ping to the handler requesting the ping.
|
||||
pub(crate) is_active_ping_pending_replies: HashMap<u64, IsActiveResultSender>,
|
||||
}
|
||||
|
||||
impl<R, S, St> FreshHandler<R, S, St>
|
||||
@@ -106,6 +114,7 @@ where
|
||||
local_identity,
|
||||
storage,
|
||||
coconut_verifier,
|
||||
is_active_ping_pending_replies: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -418,8 +427,39 @@ where
|
||||
let encrypted_address = EncryptedAddressBytes::try_from_base58_string(enc_address)?;
|
||||
let iv = IV::try_from_base58_string(iv)?;
|
||||
|
||||
if self.active_clients_store.get(address).is_some() {
|
||||
return Err(InitialAuthenticationError::DuplicateConnection);
|
||||
if let Some((__, mut is_active_request_tx)) = self.active_clients_store.get(address) {
|
||||
log::warn!("Detected duplicate connection for client: {}", address);
|
||||
|
||||
// Ask the other connection to ping if they are still active.
|
||||
// Use a oneshot channel to return the result to us
|
||||
let (ping_result_sender, ping_result_receiver) = oneshot::channel::<bool>();
|
||||
log::debug!("Asking other connection to ping the connection client");
|
||||
is_active_request_tx.send(ping_result_sender).await.ok();
|
||||
|
||||
// Wait for the reply
|
||||
match tokio::time::timeout(Duration::from_millis(1000), ping_result_receiver).await {
|
||||
Ok(Ok(res)) => {
|
||||
if res {
|
||||
// The other handled reported a positive reply, so we have to assume it's
|
||||
// still active and disconnect this connection.
|
||||
log::info!("Other handler reports it is active");
|
||||
return Err(InitialAuthenticationError::DuplicateConnection);
|
||||
} else {
|
||||
log::debug!("Other handler reports it is not active");
|
||||
self.active_clients_store.disconnect(address);
|
||||
}
|
||||
}
|
||||
Ok(Err(_)) => {
|
||||
// Other channel failed to reply (the channel sender probably dropped)
|
||||
log::info!("Other connection failed to reply, disconnecting it in favour of this new connection");
|
||||
self.active_clients_store.disconnect(address);
|
||||
}
|
||||
Err(_) => {
|
||||
// Timeout waiting for reply
|
||||
log::warn!("Other connection timed out, disconnecting it in favour of this new connection");
|
||||
self.active_clients_store.disconnect(address);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let shared_keys = self
|
||||
@@ -504,7 +544,8 @@ where
|
||||
let remote_address = remote_identity.derive_destination_address();
|
||||
|
||||
if self.active_clients_store.get(remote_address).is_some() {
|
||||
return Err(InitialAuthenticationError::DuplicateConnection);
|
||||
println!("duplicate connection when registering");
|
||||
// return Err(InitialAuthenticationError::DuplicateConnection);
|
||||
}
|
||||
|
||||
let shared_keys = self.perform_registration_handshake(init_data).await?;
|
||||
@@ -606,12 +647,19 @@ where
|
||||
}
|
||||
|
||||
return if let Some(client_details) = auth_result.client_details {
|
||||
self.active_clients_store
|
||||
.insert(client_details.address, mix_sender);
|
||||
// Channel for handlers to ask other handlers if they are still active.
|
||||
let (is_active_request_sender, is_active_request_receiver) =
|
||||
mpsc::unbounded();
|
||||
self.active_clients_store.insert(
|
||||
client_details.address,
|
||||
mix_sender,
|
||||
is_active_request_sender,
|
||||
);
|
||||
Some(AuthenticatedHandler::upgrade(
|
||||
self,
|
||||
client_details,
|
||||
mix_receiver,
|
||||
is_active_request_receiver,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
|
||||
@@ -100,7 +100,7 @@ pub(crate) async fn handle_connection<R, S, St>(
|
||||
.await
|
||||
{
|
||||
None => {
|
||||
trace!("received shutdown signal while performing initial authetnication");
|
||||
trace!("received shutdown signal while performing initial authentication");
|
||||
return;
|
||||
}
|
||||
Some(None) => {
|
||||
|
||||
@@ -1,7 +1,13 @@
|
||||
// Copyright 2020 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use futures::channel::mpsc;
|
||||
use futures::channel::{mpsc, oneshot};
|
||||
|
||||
pub(crate) type MixMessageSender = mpsc::UnboundedSender<Vec<Vec<u8>>>;
|
||||
pub(crate) type MixMessageReceiver = mpsc::UnboundedReceiver<Vec<Vec<u8>>>;
|
||||
|
||||
// Channels used for one handler to requester another handler to check that the client is still
|
||||
// active. The result is then passed back to the requesting handler in the oneshot channel.
|
||||
pub(crate) type IsActiveRequestSender = mpsc::UnboundedSender<IsActiveResultSender>;
|
||||
pub(crate) type IsActiveRequestReceiver = mpsc::UnboundedReceiver<IsActiveResultSender>;
|
||||
pub(crate) type IsActiveResultSender = oneshot::Sender<bool>;
|
||||
|
||||
@@ -70,7 +70,7 @@ impl<St: Storage> ConnectionHandler<St> {
|
||||
}
|
||||
|
||||
fn update_clients_store_cache_entry(&mut self, client_address: DestinationAddressBytes) {
|
||||
if let Some(client_sender) = self.active_clients_store.get(client_address) {
|
||||
if let Some((client_sender, _)) = self.active_clients_store.get(client_address) {
|
||||
self.clients_store_cache
|
||||
.insert(client_address, client_sender);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user