Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 59455d3f13 | |||
| 329346d952 | |||
| 9495d9821b | |||
| 2b8f49f918 | |||
| 128dfa6d81 | |||
| 0b0ec075bb | |||
| cca4d21e7c |
Generated
+2
@@ -705,6 +705,8 @@ dependencies = [
|
||||
"time 0.3.17",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-tungstenite 0.14.0",
|
||||
"tungstenite 0.13.0",
|
||||
"url",
|
||||
"validator-client",
|
||||
"wasm-bindgen",
|
||||
|
||||
@@ -20,6 +20,7 @@ serde_json = "1.0.89"
|
||||
tap = "1.0.1"
|
||||
thiserror = "1.0.34"
|
||||
url = { version ="2.2", features = ["serde"] }
|
||||
tungstenite = { version = "0.13.0", default-features = false }
|
||||
tokio = { version = "1.24.1", features = ["macros"]}
|
||||
time = "0.3.17"
|
||||
|
||||
@@ -44,6 +45,9 @@ features = ["time"]
|
||||
version = "1.24.1"
|
||||
features = ["time"]
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-tungstenite]
|
||||
version = "0.14"
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.sqlx]
|
||||
version = "0.6.2"
|
||||
features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate"]
|
||||
@@ -65,6 +69,7 @@ features = ["futures"]
|
||||
|
||||
[target."cfg(target_arch = \"wasm32\")".dependencies.wasm-utils]
|
||||
path = "../../common/wasm-utils"
|
||||
features = ["websocket"]
|
||||
|
||||
[target."cfg(target_arch = \"wasm32\")".dependencies.time]
|
||||
version = "0.3.17"
|
||||
|
||||
@@ -304,7 +304,7 @@ where
|
||||
}
|
||||
let gateway_address = self.gateway_config.gateway_listener.clone();
|
||||
if gateway_address.is_empty() {
|
||||
return Err(ClientCoreError::GatwayAddressUnknown);
|
||||
return Err(ClientCoreError::GatewayAddressUnknown);
|
||||
}
|
||||
|
||||
let gateway_identity = identity::PublicKey::from_base58_string(gateway_id)
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
use gateway_client::error::GatewayClientError;
|
||||
use nym_crypto::asymmetric::identity::Ed25519RecoveryError;
|
||||
use nym_topology::gateway::GatewayConversionError;
|
||||
use nym_topology::NymTopologyError;
|
||||
use validator_client::ValidatorClientError;
|
||||
|
||||
@@ -53,7 +54,32 @@ pub enum ClientCoreError {
|
||||
GatewayOwnerUnknown,
|
||||
|
||||
#[error("The address of the gateway is unknown - did you run init?")]
|
||||
GatwayAddressUnknown,
|
||||
GatewayAddressUnknown,
|
||||
|
||||
#[error("The gateway is malformed: {source}")]
|
||||
MalformedGateway {
|
||||
#[from]
|
||||
source: GatewayConversionError,
|
||||
},
|
||||
|
||||
#[error("failed to establish connection to gateway: {source}")]
|
||||
GatewayConnectionFailure {
|
||||
#[from]
|
||||
source: tungstenite::Error,
|
||||
},
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
#[error("failed to establish gateway connection (wasm)")]
|
||||
GatewayJsConnectionFailure,
|
||||
|
||||
#[error("Gateway connection was abruptly closed")]
|
||||
GatewayConnectionAbruptlyClosed,
|
||||
|
||||
#[error("Timed out while trying to establish gateway connection")]
|
||||
GatewayConnectionTimeout,
|
||||
|
||||
#[error("No ping measurements for the gateway ({identity}) performed")]
|
||||
NoGatewayMeasurements { identity: String },
|
||||
|
||||
#[error("failed to register receiver for reconstructed mixnet messages")]
|
||||
FailedToRegisterReceiver,
|
||||
|
||||
@@ -6,52 +6,223 @@ use crate::{
|
||||
config::{persistence::key_pathfinder::ClientKeyPathfinder, Config},
|
||||
error::ClientCoreError,
|
||||
};
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
use gateway_client::wasm_mockups::SigningNyxdClient;
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use gateway_client::GatewayClient;
|
||||
use gateway_requests::registration::handshake::SharedKeys;
|
||||
use log::{debug, info, trace, warn};
|
||||
use nym_config::NymConfig;
|
||||
use nym_crypto::asymmetric::identity;
|
||||
use nym_topology::{filter::VersionFilterable, gateway};
|
||||
use rand::{seq::SliceRandom, thread_rng};
|
||||
use rand::{seq::SliceRandom, thread_rng, Rng};
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use tap::TapFallible;
|
||||
use tungstenite::Message;
|
||||
use url::Url;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio::net::TcpStream;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio::time::Instant;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio_tungstenite::connect_async;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use validator_client::nyxd::SigningNyxdClient;
|
||||
|
||||
pub(super) async fn query_gateway_details(
|
||||
validator_servers: Vec<Url>,
|
||||
chosen_gateway_id: Option<identity::PublicKey>,
|
||||
) -> Result<gateway::Node, ClientCoreError> {
|
||||
let nym_api = validator_servers
|
||||
.choose(&mut thread_rng())
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
type WsConn = WebSocketStream<MaybeTlsStream<TcpStream>>;
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
use gateway_client::wasm_mockups::SigningNyxdClient;
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
use wasm_timer::Instant;
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
use wasm_utils::websocket::JSWebsocket;
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
type WsConn = JSWebsocket;
|
||||
|
||||
const MEASUREMENTS: usize = 3;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
const CONN_TIMEOUT: Duration = Duration::from_millis(1500);
|
||||
const PING_TIMEOUT: Duration = Duration::from_millis(1000);
|
||||
|
||||
struct GatewayWithLatency {
|
||||
gateway: gateway::Node,
|
||||
latency: Duration,
|
||||
}
|
||||
|
||||
impl GatewayWithLatency {
|
||||
fn new(gateway: gateway::Node, latency: Duration) -> Self {
|
||||
GatewayWithLatency { gateway, latency }
|
||||
}
|
||||
}
|
||||
|
||||
async fn current_gateways<R: Rng>(
|
||||
rng: &mut R,
|
||||
nym_apis: Vec<Url>,
|
||||
) -> Result<Vec<gateway::Node>, ClientCoreError> {
|
||||
let nym_api = nym_apis
|
||||
.choose(rng)
|
||||
.ok_or(ClientCoreError::ListOfNymApisIsEmpty)?;
|
||||
let validator_client = validator_client::client::NymApiClient::new(nym_api.clone());
|
||||
let client = validator_client::client::NymApiClient::new(nym_api.clone());
|
||||
|
||||
log::trace!("Fetching list of gateways from: {}", nym_api);
|
||||
let gateways = validator_client.get_cached_gateways().await?;
|
||||
|
||||
let gateways = client.get_cached_gateways().await?;
|
||||
let valid_gateways = gateways
|
||||
.into_iter()
|
||||
.filter_map(|gateway| gateway.try_into().ok())
|
||||
.collect::<Vec<gateway::Node>>();
|
||||
|
||||
// we were always filtering by version so I'm not removing that 'feature'
|
||||
let filtered_gateways = valid_gateways.filter_by_version(env!("CARGO_PKG_VERSION"));
|
||||
Ok(filtered_gateways)
|
||||
}
|
||||
|
||||
// if we have chosen particular gateway - use it, otherwise choose a random one.
|
||||
// (remember that in active topology all gateways have at least 100 reputation so should
|
||||
// be working correctly)
|
||||
if let Some(gateway_id) = chosen_gateway_id {
|
||||
filtered_gateways
|
||||
.iter()
|
||||
.find(|gateway| gateway.identity_key == gateway_id)
|
||||
.ok_or_else(|| ClientCoreError::NoGatewayWithId(gateway_id.to_string()))
|
||||
.cloned()
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
async fn connect(endpoint: &str) -> Result<WsConn, ClientCoreError> {
|
||||
match tokio::time::timeout(CONN_TIMEOUT, connect_async(endpoint)).await {
|
||||
Err(_elapsed) => Err(ClientCoreError::GatewayConnectionTimeout),
|
||||
Ok(Err(conn_failure)) => Err(conn_failure.into()),
|
||||
Ok(Ok((stream, _))) => Ok(stream),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
async fn connect(endpoint: &str) -> Result<WsConn, ClientCoreError> {
|
||||
JSWebsocket::new(endpoint).map_err(|_| ClientCoreError::GatewayJsConnectionFailure)
|
||||
}
|
||||
|
||||
async fn measure_latency(gateway: gateway::Node) -> Result<GatewayWithLatency, ClientCoreError> {
|
||||
let addr = gateway.clients_address();
|
||||
trace!(
|
||||
"establishing connection to {} ({addr})...",
|
||||
gateway.identity_key,
|
||||
);
|
||||
let mut stream = connect(&addr).await?;
|
||||
|
||||
let mut results = Vec::new();
|
||||
for _ in 0..MEASUREMENTS {
|
||||
let measurement_future = async {
|
||||
let ping_content = vec![1, 2, 3];
|
||||
let start = Instant::now();
|
||||
stream.send(Message::Ping(ping_content.clone())).await?;
|
||||
|
||||
match stream.next().await {
|
||||
Some(Ok(Message::Pong(content))) => {
|
||||
if content == ping_content {
|
||||
let elapsed = Instant::now().duration_since(start);
|
||||
trace!("current ping time: {elapsed:?}");
|
||||
results.push(elapsed);
|
||||
} else {
|
||||
warn!("received a pong message with different content? wtf.")
|
||||
}
|
||||
}
|
||||
Some(Ok(_)) => warn!("received a message that's not a pong!"),
|
||||
Some(Err(err)) => return Err(err.into()),
|
||||
None => return Err(ClientCoreError::GatewayConnectionAbruptlyClosed),
|
||||
}
|
||||
|
||||
Ok::<(), ClientCoreError>(())
|
||||
};
|
||||
|
||||
// thanks to wasm we can't use tokio::time::timeout : (
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
let timeout = tokio::time::sleep(PING_TIMEOUT);
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
tokio::pin!(timeout);
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
let mut timeout = wasm_timer::Delay::new(PING_TIMEOUT);
|
||||
|
||||
tokio::select! {
|
||||
_ = &mut timeout => {
|
||||
warn!("timed out while trying to perform measurement...")
|
||||
}
|
||||
res = measurement_future => res?,
|
||||
}
|
||||
}
|
||||
|
||||
let count = results.len() as u64;
|
||||
if count == 0 {
|
||||
return Err(ClientCoreError::NoGatewayMeasurements {
|
||||
identity: gateway.identity_key.to_base58_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let sum: Duration = results.into_iter().sum();
|
||||
let avg = Duration::from_nanos(sum.as_nanos() as u64 / count);
|
||||
|
||||
Ok(GatewayWithLatency::new(gateway, avg))
|
||||
}
|
||||
|
||||
async fn choose_gateway_by_latency<R: Rng>(
|
||||
rng: &mut R,
|
||||
gateways: Vec<gateway::Node>,
|
||||
) -> Result<gateway::Node, ClientCoreError> {
|
||||
info!("choosing gateway by latency...");
|
||||
|
||||
let mut gateways_with_latency = Vec::new();
|
||||
for gateway in gateways {
|
||||
let id = *gateway.identity();
|
||||
trace!("measuring latency to {id}...");
|
||||
let with_latency = match measure_latency(gateway).await {
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
warn!("failed to measure {id}: {err}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
debug!(
|
||||
"{id} ({}): {:?}",
|
||||
with_latency.gateway.location, with_latency.latency
|
||||
);
|
||||
gateways_with_latency.push(with_latency)
|
||||
}
|
||||
|
||||
let chosen = gateways_with_latency
|
||||
.choose_weighted(rng, |item| 1. / item.latency.as_secs_f32())
|
||||
.expect("invalid selection weight!");
|
||||
|
||||
info!(
|
||||
"chose gateway {} (located at {}) with average latency of {:?}",
|
||||
chosen.gateway.identity_key, chosen.gateway.location, chosen.latency
|
||||
);
|
||||
|
||||
Ok(chosen.gateway.clone())
|
||||
}
|
||||
|
||||
fn uniformly_random_gateway<R: Rng>(
|
||||
rng: &mut R,
|
||||
gateways: Vec<gateway::Node>,
|
||||
) -> Result<gateway::Node, ClientCoreError> {
|
||||
gateways
|
||||
.choose(rng)
|
||||
.ok_or(ClientCoreError::NoGatewaysOnNetwork)
|
||||
.cloned()
|
||||
}
|
||||
|
||||
pub(super) async fn query_gateway_details(
|
||||
validator_servers: Vec<Url>,
|
||||
chosen_gateway_id: Option<identity::PublicKey>,
|
||||
by_latency: bool,
|
||||
) -> Result<gateway::Node, ClientCoreError> {
|
||||
let mut rng = thread_rng();
|
||||
let gateways = current_gateways(&mut rng, validator_servers).await?;
|
||||
|
||||
// if we set an explicit gateway, use that one and nothing else
|
||||
if let Some(explicitly_chosen) = chosen_gateway_id {
|
||||
gateways
|
||||
.into_iter()
|
||||
.find(|gateway| gateway.identity_key == explicitly_chosen)
|
||||
.ok_or_else(|| ClientCoreError::NoGatewayWithId(explicitly_chosen.to_string()))
|
||||
} else if by_latency {
|
||||
choose_gateway_by_latency(&mut rng, gateways).await
|
||||
} else {
|
||||
filtered_gateways
|
||||
.choose(&mut rand::thread_rng())
|
||||
.ok_or(ClientCoreError::NoGatewaysOnNetwork)
|
||||
.cloned()
|
||||
uniformly_random_gateway(&mut rng, gateways)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -77,9 +77,11 @@ pub async fn register_with_gateway(
|
||||
key_manager: &mut KeyManager,
|
||||
nym_api_endpoints: Vec<Url>,
|
||||
chosen_gateway_id: Option<identity::PublicKey>,
|
||||
by_latency: bool,
|
||||
) -> Result<GatewayEndpointConfig, ClientCoreError> {
|
||||
// Get the gateway details of the gateway we will use
|
||||
let gateway = helpers::query_gateway_details(nym_api_endpoints, chosen_gateway_id).await?;
|
||||
let gateway =
|
||||
helpers::query_gateway_details(nym_api_endpoints, chosen_gateway_id, by_latency).await?;
|
||||
log::debug!("Querying gateway gives: {}", gateway);
|
||||
|
||||
let our_identity = key_manager.identity_keypair();
|
||||
@@ -102,6 +104,7 @@ pub async fn setup_gateway_from_config<C, T>(
|
||||
register_gateway: bool,
|
||||
user_chosen_gateway_id: Option<identity::PublicKey>,
|
||||
config: &Config<T>,
|
||||
by_latency: bool,
|
||||
) -> Result<GatewayEndpointConfig, ClientCoreError>
|
||||
where
|
||||
C: NymConfig + ClientCoreConfigTrait,
|
||||
@@ -117,9 +120,12 @@ where
|
||||
}
|
||||
|
||||
// Else, we preceed by querying the nym-api
|
||||
let gateway =
|
||||
helpers::query_gateway_details(config.get_nym_api_endpoints(), user_chosen_gateway_id)
|
||||
.await?;
|
||||
let gateway = helpers::query_gateway_details(
|
||||
config.get_nym_api_endpoints(),
|
||||
user_chosen_gateway_id,
|
||||
by_latency,
|
||||
)
|
||||
.await?;
|
||||
log::debug!("Querying gateway gives: {}", gateway);
|
||||
|
||||
// If we are not registering, just return this and assume the caller has the keys already and
|
||||
|
||||
@@ -51,8 +51,11 @@ async fn block_until_coconut_is_available<C: Clone + CosmWasmClient + Send + Syn
|
||||
|
||||
break;
|
||||
} else {
|
||||
// Use 20 additional seconds to avoid the exact moment of going into the final epoch state
|
||||
let secs_until_final = epoch.final_timestamp_secs() + 20 - current_timestamp_secs;
|
||||
// Use 1 additional second to not start the next iteration immediately and spam get_current_epoch queries
|
||||
let secs_until_final = epoch
|
||||
.final_timestamp_secs()
|
||||
.saturating_sub(current_timestamp_secs)
|
||||
+ 1;
|
||||
info!("Approximately {} seconds until coconut is available. Sleeping until then. You can safely kill the process at any moment.", secs_until_final);
|
||||
std::thread::sleep(Duration::from_secs(secs_until_final));
|
||||
}
|
||||
|
||||
@@ -25,6 +25,11 @@ pub(crate) struct Init {
|
||||
#[clap(long)]
|
||||
gateway: Option<identity::PublicKey>,
|
||||
|
||||
/// Specifies whether the new gateway should be determined based by latency as opposed to being chosen
|
||||
/// uniformly.
|
||||
#[clap(long, conflicts_with = "gateway")]
|
||||
latency_based_selection: bool,
|
||||
|
||||
/// Force register gateway. WARNING: this will overwrite any existing keys for the given id,
|
||||
/// potentially causing loss of access.
|
||||
#[clap(long)]
|
||||
@@ -143,6 +148,7 @@ pub(crate) async fn execute(args: &Init) -> Result<(), ClientError> {
|
||||
register_gateway,
|
||||
user_chosen_gateway_id,
|
||||
config.get_base(),
|
||||
args.latency_based_selection,
|
||||
)
|
||||
.await
|
||||
.tap_err(|err| eprintln!("Failed to setup gateway\nError: {err}"))?;
|
||||
|
||||
@@ -37,6 +37,11 @@ pub(crate) struct Init {
|
||||
#[clap(long)]
|
||||
gateway: Option<identity::PublicKey>,
|
||||
|
||||
/// Specifies whether the new gateway should be determined based by latency as opposed to being chosen
|
||||
/// uniformly.
|
||||
#[clap(long, conflicts_with = "gateway")]
|
||||
latency_based_selection: bool,
|
||||
|
||||
/// Force register gateway. WARNING: this will overwrite any existing keys for the given id,
|
||||
/// potentially causing loss of access.
|
||||
#[clap(long)]
|
||||
@@ -149,6 +154,7 @@ pub(crate) async fn execute(args: &Init) -> Result<(), Socks5ClientError> {
|
||||
register_gateway,
|
||||
user_chosen_gateway_id,
|
||||
config.get_base(),
|
||||
args.latency_based_selection,
|
||||
)
|
||||
.await
|
||||
.tap_err(|err| eprintln!("Failed to setup gateway\nError: {err}"))?;
|
||||
|
||||
@@ -11,9 +11,7 @@ use nym_api_requests::models::{
|
||||
GatewayCoreStatusResponse, MixnodeCoreStatusResponse, MixnodeStatusResponse,
|
||||
RewardEstimationResponse, StakeSaturationResponse,
|
||||
};
|
||||
use nym_mixnet_contract_common::mixnode::MixNodeDetails;
|
||||
use nym_mixnet_contract_common::MixId;
|
||||
use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef};
|
||||
pub use nym_mixnet_contract_common::{mixnode::MixNodeDetails, GatewayBond, IdentityKeyRef, MixId};
|
||||
|
||||
#[cfg(feature = "nyxd-client")]
|
||||
use crate::nyxd::traits::{DkgQueryClient, MixnetQueryClient, MultisigQueryClient};
|
||||
|
||||
@@ -80,7 +80,7 @@ pub(crate) mod tests {
|
||||
use crate::epoch_state::transactions::advance_epoch_state;
|
||||
use crate::support::tests::fixtures::dealer_details_fixture;
|
||||
use crate::support::tests::helpers;
|
||||
use crate::support::tests::helpers::GROUP_MEMBERS;
|
||||
use crate::support::tests::helpers::{add_fixture_dealer, GROUP_MEMBERS};
|
||||
use coconut_dkg_common::types::{InitialReplacementData, TimeConfiguration};
|
||||
use cosmwasm_std::testing::{mock_env, mock_info};
|
||||
use cw4::Member;
|
||||
@@ -147,6 +147,8 @@ pub(crate) mod tests {
|
||||
.block
|
||||
.time
|
||||
.plus_seconds(TimeConfiguration::default().public_key_submission_time_secs);
|
||||
|
||||
add_fixture_dealer(deps.as_mut());
|
||||
advance_epoch_state(deps.as_mut(), env).unwrap();
|
||||
|
||||
let ret = try_add_dealer(
|
||||
|
||||
@@ -53,6 +53,7 @@ pub(crate) mod tests {
|
||||
use crate::epoch_state::transactions::advance_epoch_state;
|
||||
use crate::support::tests::fixtures::{dealer_details_fixture, dealing_bytes_fixture};
|
||||
use crate::support::tests::helpers;
|
||||
use crate::support::tests::helpers::add_fixture_dealer;
|
||||
use coconut_dkg_common::dealer::DealerDetails;
|
||||
use coconut_dkg_common::types::{InitialReplacementData, TimeConfiguration};
|
||||
use cosmwasm_std::testing::{mock_env, mock_info};
|
||||
@@ -80,6 +81,7 @@ pub(crate) mod tests {
|
||||
.block
|
||||
.time
|
||||
.plus_seconds(TimeConfiguration::default().public_key_submission_time_secs);
|
||||
add_fixture_dealer(deps.as_mut());
|
||||
advance_epoch_state(deps.as_mut(), env).unwrap();
|
||||
|
||||
let ret = try_commit_dealings(deps.as_mut(), info.clone(), dealing_bytes.clone(), false)
|
||||
|
||||
@@ -7,6 +7,7 @@ use crate::epoch_state::storage::{CURRENT_EPOCH, INITIAL_REPLACEMENT_DATA, THRES
|
||||
use crate::epoch_state::utils::check_epoch_state;
|
||||
use crate::error::ContractError;
|
||||
use crate::state::STATE;
|
||||
use crate::verification_key_shares::storage::vk_shares;
|
||||
use coconut_dkg_common::types::{Epoch, EpochState, InitialReplacementData};
|
||||
use cosmwasm_std::{Addr, Deps, DepsMut, Env, Order, Response, Storage};
|
||||
|
||||
@@ -89,23 +90,29 @@ pub(crate) fn advance_epoch_state(deps: DepsMut<'_>, env: Env) -> Result<Respons
|
||||
let current_epoch = CURRENT_EPOCH.load(deps.storage)?;
|
||||
let next_epoch = if let Some(state) = current_epoch.state.next() {
|
||||
// We are during DKG process
|
||||
let mut new_state = state;
|
||||
if let EpochState::DealingExchange { resharing } = state {
|
||||
let current_dealers = current_dealers()
|
||||
.keys(deps.storage, None, None, Order::Ascending)
|
||||
.collect::<Result<Vec<Addr>, _>>()?;
|
||||
// note: ceiling in integer division can be achieved via q = (x + y - 1) / y;
|
||||
let threshold = (2 * current_dealers.len() as u64 + 3 - 1) / 3;
|
||||
THRESHOLD.save(deps.storage, &threshold)?;
|
||||
if !resharing {
|
||||
let replacement_data = InitialReplacementData {
|
||||
initial_dealers: current_dealers,
|
||||
initial_height: None,
|
||||
};
|
||||
INITIAL_REPLACEMENT_DATA.save(deps.storage, &replacement_data)?;
|
||||
if current_dealers.is_empty() {
|
||||
// If no dealer registered yet, we just stay in the same state until there's at least one
|
||||
new_state = current_epoch.state;
|
||||
} else {
|
||||
// note: ceiling in integer division can be achieved via q = (x + y - 1) / y;
|
||||
let threshold = (2 * current_dealers.len() as u64 + 3 - 1) / 3;
|
||||
THRESHOLD.save(deps.storage, &threshold)?;
|
||||
if !resharing {
|
||||
let replacement_data = InitialReplacementData {
|
||||
initial_dealers: current_dealers,
|
||||
initial_height: None,
|
||||
};
|
||||
INITIAL_REPLACEMENT_DATA.save(deps.storage, &replacement_data)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
Epoch::new(
|
||||
state,
|
||||
new_state,
|
||||
current_epoch.epoch_id,
|
||||
current_epoch.time_configuration,
|
||||
env.block.time,
|
||||
@@ -152,9 +159,16 @@ pub(crate) fn try_surpassed_threshold(
|
||||
check_epoch_state(deps.storage, EpochState::InProgress)?;
|
||||
|
||||
let threshold = THRESHOLD.load(deps.storage)?;
|
||||
let dealers = current_dealers()
|
||||
.keys(deps.storage, None, None, Order::Ascending)
|
||||
.flatten();
|
||||
let dealers = vk_shares()
|
||||
.range(deps.storage, None, None, Order::Ascending)
|
||||
.flatten()
|
||||
.filter_map(|(_, share)| {
|
||||
if share.verified {
|
||||
Some(share.owner)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
if dealers_still_active(&deps.as_ref(), dealers)? < threshold as usize {
|
||||
reset_epoch_state(deps.storage)?;
|
||||
CURRENT_EPOCH.update::<_, ContractError>(deps.storage, |epoch| {
|
||||
@@ -174,7 +188,7 @@ pub(crate) fn try_surpassed_threshold(
|
||||
pub(crate) mod tests {
|
||||
use super::*;
|
||||
use crate::error::ContractError::EarlyEpochStateAdvancement;
|
||||
use crate::support::tests::fixtures::dealer_details_fixture;
|
||||
use crate::support::tests::fixtures::{dealer_details_fixture, vk_share_fixture};
|
||||
use crate::support::tests::helpers::{init_contract, GROUP_MEMBERS};
|
||||
use coconut_dkg_common::types::{
|
||||
ContractSafeBytes, DealerDetails, EpochState, TimeConfiguration,
|
||||
@@ -392,6 +406,14 @@ pub(crate) mod tests {
|
||||
EarlyEpochStateAdvancement(1)
|
||||
);
|
||||
|
||||
env.block.time = env.block.time.plus_seconds(1);
|
||||
advance_epoch_state(deps.as_mut(), env.clone()).unwrap();
|
||||
let epoch = CURRENT_EPOCH.load(deps.as_mut().storage).unwrap();
|
||||
assert_eq!(
|
||||
epoch.state,
|
||||
EpochState::PublicKeySubmission { resharing: false }
|
||||
);
|
||||
|
||||
// setup dealer details
|
||||
let all_details: [_; 4] = std::array::from_fn(|i| dealer_details_fixture(i as u64 + 1));
|
||||
for details in all_details.iter() {
|
||||
@@ -404,7 +426,7 @@ pub(crate) mod tests {
|
||||
.may_load(&deps.storage)
|
||||
.unwrap()
|
||||
.is_none());
|
||||
env.block.time = env.block.time.plus_seconds(1);
|
||||
env.block.time = env.block.time.plus_seconds(epoch.time_configuration.public_key_submission_time_secs);
|
||||
advance_epoch_state(deps.as_mut(), env.clone()).unwrap();
|
||||
let epoch = CURRENT_EPOCH.load(deps.as_mut().storage).unwrap();
|
||||
assert_eq!(
|
||||
@@ -664,6 +686,12 @@ pub(crate) mod tests {
|
||||
.save(deps.as_mut().storage, &details.address, details)
|
||||
.unwrap();
|
||||
}
|
||||
let all_shares: [_; 3] = std::array::from_fn(|i| vk_share_fixture(&format!("owner{}", i + 1), 0));
|
||||
for share in all_shares.iter() {
|
||||
vk_shares()
|
||||
.save(deps.as_mut().storage, (&share.owner, share.epoch_id), share)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
for times in [
|
||||
time_configuration.public_key_submission_time_secs,
|
||||
|
||||
@@ -2,11 +2,13 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::contract::instantiate;
|
||||
use crate::dealers::storage::current_dealers;
|
||||
use coconut_dkg_common::msg::InstantiateMsg;
|
||||
use coconut_dkg_common::types::DealerDetails;
|
||||
use cosmwasm_std::testing::{mock_dependencies, mock_env, mock_info, MockApi, MockQuerier};
|
||||
use cosmwasm_std::{
|
||||
from_binary, to_binary, ContractResult, Empty, MemoryStorage, OwnedDeps, QuerierResult,
|
||||
SystemResult, WasmQuery,
|
||||
from_binary, to_binary, Addr, ContractResult, DepsMut, Empty, MemoryStorage, OwnedDeps,
|
||||
QuerierResult, SystemResult, WasmQuery,
|
||||
};
|
||||
use cw4::{Cw4QueryMsg, Member, MemberListResponse, MemberResponse};
|
||||
use lazy_static::lazy_static;
|
||||
@@ -22,6 +24,22 @@ lazy_static! {
|
||||
pub static ref GROUP_MEMBERS: Mutex<Vec<(Member, u64)>> = Mutex::new(vec![]);
|
||||
}
|
||||
|
||||
pub fn add_fixture_dealer(deps: DepsMut<'_>) {
|
||||
let owner = Addr::unchecked("owner");
|
||||
current_dealers()
|
||||
.save(
|
||||
deps.storage,
|
||||
&owner,
|
||||
&DealerDetails {
|
||||
address: owner.clone(),
|
||||
bte_public_key_with_proof: String::new(),
|
||||
announce_address: String::new(),
|
||||
assigned_index: 100,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn querier_handler(query: &WasmQuery) -> QuerierResult {
|
||||
let bin = match query {
|
||||
WasmQuery::Smart { contract_addr, msg } => {
|
||||
|
||||
@@ -91,7 +91,7 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::epoch_state::transactions::advance_epoch_state;
|
||||
use crate::support::tests::helpers;
|
||||
use crate::support::tests::helpers::MULTISIG_CONTRACT;
|
||||
use crate::support::tests::helpers::{add_fixture_dealer, MULTISIG_CONTRACT};
|
||||
use coconut_dkg_common::dealer::DealerDetails;
|
||||
use coconut_dkg_common::types::{EpochState, TimeConfiguration};
|
||||
use cosmwasm_std::testing::{mock_env, mock_info};
|
||||
@@ -104,6 +104,7 @@ mod tests {
|
||||
let info = mock_info("requester", &[]);
|
||||
let share = "share".to_string();
|
||||
|
||||
add_fixture_dealer(deps.as_mut());
|
||||
env.block.time = env
|
||||
.block
|
||||
.time
|
||||
@@ -171,6 +172,7 @@ mod tests {
|
||||
.to_string()
|
||||
}
|
||||
);
|
||||
add_fixture_dealer(deps.as_mut());
|
||||
env.block.time = env
|
||||
.block
|
||||
.time
|
||||
@@ -247,6 +249,7 @@ mod tests {
|
||||
}
|
||||
);
|
||||
|
||||
add_fixture_dealer(deps.as_mut());
|
||||
env.block.time = env
|
||||
.block
|
||||
.time
|
||||
@@ -292,6 +295,7 @@ mod tests {
|
||||
let share = "share".to_string();
|
||||
let multisig_info = mock_info(MULTISIG_CONTRACT, &[]);
|
||||
|
||||
add_fixture_dealer(deps.as_mut());
|
||||
env.block.time = env
|
||||
.block
|
||||
.time
|
||||
|
||||
@@ -99,57 +99,66 @@ impl<R: RngCore + Clone> DkgController<R> {
|
||||
return;
|
||||
}
|
||||
if let Err(err) = self.state.is_consistent(epoch.state).await {
|
||||
error!("Epoch state is corrupted - {err}, the process should be terminated");
|
||||
return;
|
||||
}
|
||||
let ret = match epoch.state {
|
||||
EpochState::PublicKeySubmission { resharing } => {
|
||||
public_key_submission(&self.dkg_client, &mut self.state, resharing).await
|
||||
}
|
||||
EpochState::DealingExchange { resharing } => {
|
||||
dealing_exchange(
|
||||
&self.dkg_client,
|
||||
&mut self.state,
|
||||
self.rng.clone(),
|
||||
resharing,
|
||||
)
|
||||
.await
|
||||
}
|
||||
EpochState::VerificationKeySubmission { resharing } => {
|
||||
let keypair_path = nym_pemstore::KeyPairPath::new(
|
||||
self.secret_key_path.clone(),
|
||||
self.verification_key_path.clone(),
|
||||
);
|
||||
verification_key_submission(
|
||||
&self.dkg_client,
|
||||
&mut self.state,
|
||||
&keypair_path,
|
||||
resharing,
|
||||
)
|
||||
.await
|
||||
}
|
||||
EpochState::VerificationKeyValidation { resharing } => {
|
||||
verification_key_validation(&self.dkg_client, &mut self.state, resharing)
|
||||
debug!("Epoch state is corrupted - {err}. Awaiting for a DKG restart.");
|
||||
} else {
|
||||
let ret = match epoch.state {
|
||||
EpochState::PublicKeySubmission { resharing } => {
|
||||
public_key_submission(&self.dkg_client, &mut self.state, resharing)
|
||||
.await
|
||||
}
|
||||
EpochState::DealingExchange { resharing } => {
|
||||
dealing_exchange(
|
||||
&self.dkg_client,
|
||||
&mut self.state,
|
||||
self.rng.clone(),
|
||||
resharing,
|
||||
)
|
||||
.await
|
||||
}
|
||||
EpochState::VerificationKeyFinalization { resharing } => {
|
||||
verification_key_finalization(&self.dkg_client, &mut self.state, resharing)
|
||||
}
|
||||
EpochState::VerificationKeySubmission { resharing } => {
|
||||
let keypair_path = nym_pemstore::KeyPairPath::new(
|
||||
self.secret_key_path.clone(),
|
||||
self.verification_key_path.clone(),
|
||||
);
|
||||
verification_key_submission(
|
||||
&self.dkg_client,
|
||||
&mut self.state,
|
||||
&keypair_path,
|
||||
resharing,
|
||||
)
|
||||
.await
|
||||
}
|
||||
// Just wait, in case we need to redo dkg at some point
|
||||
EpochState::InProgress => {
|
||||
self.state.set_was_in_progress();
|
||||
Ok(())
|
||||
}
|
||||
};
|
||||
if let Err(err) = ret {
|
||||
warn!("Could not handle this iteration for the epoch state: {err}");
|
||||
} else if epoch.state != EpochState::InProgress {
|
||||
let persistent_state = PersistentState::from(&self.state);
|
||||
if let Err(err) =
|
||||
persistent_state.save_to_file(self.state.persistent_state_path())
|
||||
{
|
||||
warn!("Could not backup the state for this iteration: {err}");
|
||||
}
|
||||
EpochState::VerificationKeyValidation { resharing } => {
|
||||
verification_key_validation(
|
||||
&self.dkg_client,
|
||||
&mut self.state,
|
||||
resharing,
|
||||
)
|
||||
.await
|
||||
}
|
||||
EpochState::VerificationKeyFinalization { resharing } => {
|
||||
verification_key_finalization(
|
||||
&self.dkg_client,
|
||||
&mut self.state,
|
||||
resharing,
|
||||
)
|
||||
.await
|
||||
}
|
||||
// Just wait, in case we need to redo dkg at some point
|
||||
EpochState::InProgress => {
|
||||
self.state.set_was_in_progress();
|
||||
Ok(())
|
||||
}
|
||||
};
|
||||
if let Err(err) = ret {
|
||||
warn!("Could not handle this iteration for the epoch state: {err}");
|
||||
} else if epoch.state != EpochState::InProgress {
|
||||
let persistent_state = PersistentState::from(&self.state);
|
||||
if let Err(err) =
|
||||
persistent_state.save_to_file(self.state.persistent_state_path())
|
||||
{
|
||||
warn!("Could not backup the state for this iteration: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Ok(current_timestamp) =
|
||||
|
||||
@@ -133,7 +133,7 @@ impl ConsistentState for State {
|
||||
|
||||
fn proposal_id_value(&self) -> Result<u64, CoconutError> {
|
||||
self.proposal_id.ok_or(CoconutError::UnrecoverableState {
|
||||
reason: String::from("Proposal id should have benn set"),
|
||||
reason: String::from("Proposal id should have been set"),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,7 +91,7 @@ pub enum CoconutError {
|
||||
#[error("Failed to recover assigned node index: {reason}")]
|
||||
NodeIndexRecoveryError { reason: String },
|
||||
|
||||
#[error("Unrecoverable state: {reason}. Process should be restarted")]
|
||||
#[error("Unrecoverable state: {reason}")]
|
||||
UnrecoverableState { reason: String },
|
||||
|
||||
#[error("DKG has not finished yet in order to derive the coconut key")]
|
||||
|
||||
@@ -138,6 +138,8 @@ pub async fn init_socks5_config(provider_address: String, chosen_gateway_id: Str
|
||||
register_gateway,
|
||||
Some(chosen_gateway_id),
|
||||
config.get_base(),
|
||||
// TODO: another instance where this setting should probably get used
|
||||
false,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -118,6 +118,7 @@ pub async fn init_socks5_config(
|
||||
&mut key_manager,
|
||||
nym_api_endpoints,
|
||||
Some(chosen_gateway_id),
|
||||
false,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -259,6 +259,8 @@ where
|
||||
&mut self.key_manager,
|
||||
self.config.nym_api_endpoints.clone(),
|
||||
user_chosen_gateway,
|
||||
// TODO: this should probably be configurable with the config
|
||||
false,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -24,6 +24,11 @@ pub(crate) struct Init {
|
||||
#[clap(long)]
|
||||
gateway: Option<identity::PublicKey>,
|
||||
|
||||
/// Specifies whether the new gateway should be determined based by latency as opposed to being chosen
|
||||
/// uniformly.
|
||||
#[clap(long, conflicts_with = "gateway")]
|
||||
latency_based_selection: bool,
|
||||
|
||||
/// Force register gateway. WARNING: this will overwrite any existing keys for the given id,
|
||||
/// potentially causing loss of access.
|
||||
#[clap(long)]
|
||||
@@ -115,6 +120,7 @@ pub(crate) async fn execute(args: &Init) -> Result<(), NetworkRequesterError> {
|
||||
register_gateway,
|
||||
user_chosen_gateway_id,
|
||||
config.get_base(),
|
||||
args.latency_based_selection,
|
||||
)
|
||||
.await
|
||||
.map_err(|source| {
|
||||
|
||||
Reference in New Issue
Block a user