Compare commits

...

7 Commits

Author SHA1 Message Date
Bogdan-Ștefan Neacșu 59455d3f13 API auto-advance epoch even on corrupt states 2023-02-27 16:40:41 +02:00
Bogdan-Ștefan Neacșu 329346d952 Fix various dkg logs 2023-02-27 16:21:40 +02:00
Bogdan-Ștefan Neacșu 9495d9821b Compare verified vks against current group instead of initial dealers 2023-02-27 14:01:14 +02:00
Bogdan-Ștefan Neacșu 2b8f49f918 Extend public key submission in case no dealer registered 2023-02-24 17:24:29 +02:00
Jędrzej Stuczyński 128dfa6d81 Feature/latency based gateway selection (#3081)
* wip

* new option to select gateways based on latency

* further changes for wasm-compatibility

* post rebase fixes + clippy

I know, I should have probably included them properly during rebasing ¯\_(ツ)_/¯

* android change

* wasm: the gift that keeps on giving
2023-02-23 17:09:22 +00:00
Bogdan-Ștefan Neacşu 0b0ec075bb Use saturating_sub with an additional 1 second buffer (#3095) 2023-02-23 13:44:54 +01:00
Tommy Verrall cca4d21e7c Merge pull request #3097 from nymtech/feature/update-checker-to-use-master
Feature/update checker to use master
2023-02-23 14:24:59 +02:00
22 changed files with 403 additions and 106 deletions
Generated
+2
View File
@@ -705,6 +705,8 @@ dependencies = [
"time 0.3.17",
"tokio",
"tokio-stream",
"tokio-tungstenite 0.14.0",
"tungstenite 0.13.0",
"url",
"validator-client",
"wasm-bindgen",
+5
View File
@@ -20,6 +20,7 @@ serde_json = "1.0.89"
tap = "1.0.1"
thiserror = "1.0.34"
url = { version ="2.2", features = ["serde"] }
tungstenite = { version = "0.13.0", default-features = false }
tokio = { version = "1.24.1", features = ["macros"]}
time = "0.3.17"
@@ -44,6 +45,9 @@ features = ["time"]
version = "1.24.1"
features = ["time"]
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-tungstenite]
version = "0.14"
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.sqlx]
version = "0.6.2"
features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate"]
@@ -65,6 +69,7 @@ features = ["futures"]
[target."cfg(target_arch = \"wasm32\")".dependencies.wasm-utils]
path = "../../common/wasm-utils"
features = ["websocket"]
[target."cfg(target_arch = \"wasm32\")".dependencies.time]
version = "0.3.17"
@@ -304,7 +304,7 @@ where
}
let gateway_address = self.gateway_config.gateway_listener.clone();
if gateway_address.is_empty() {
return Err(ClientCoreError::GatwayAddressUnknown);
return Err(ClientCoreError::GatewayAddressUnknown);
}
let gateway_identity = identity::PublicKey::from_base58_string(gateway_id)
+27 -1
View File
@@ -3,6 +3,7 @@
use gateway_client::error::GatewayClientError;
use nym_crypto::asymmetric::identity::Ed25519RecoveryError;
use nym_topology::gateway::GatewayConversionError;
use nym_topology::NymTopologyError;
use validator_client::ValidatorClientError;
@@ -53,7 +54,32 @@ pub enum ClientCoreError {
GatewayOwnerUnknown,
#[error("The address of the gateway is unknown - did you run init?")]
GatwayAddressUnknown,
GatewayAddressUnknown,
#[error("The gateway is malformed: {source}")]
MalformedGateway {
#[from]
source: GatewayConversionError,
},
#[error("failed to establish connection to gateway: {source}")]
GatewayConnectionFailure {
#[from]
source: tungstenite::Error,
},
#[cfg(target_arch = "wasm32")]
#[error("failed to establish gateway connection (wasm)")]
GatewayJsConnectionFailure,
#[error("Gateway connection was abruptly closed")]
GatewayConnectionAbruptlyClosed,
#[error("Timed out while trying to establish gateway connection")]
GatewayConnectionTimeout,
#[error("No ping measurements for the gateway ({identity}) performed")]
NoGatewayMeasurements { identity: String },
#[error("failed to register receiver for reconstructed mixnet messages")]
FailedToRegisterReceiver,
+195 -24
View File
@@ -6,52 +6,223 @@ use crate::{
config::{persistence::key_pathfinder::ClientKeyPathfinder, Config},
error::ClientCoreError,
};
#[cfg(target_arch = "wasm32")]
use gateway_client::wasm_mockups::SigningNyxdClient;
use futures::{SinkExt, StreamExt};
use gateway_client::GatewayClient;
use gateway_requests::registration::handshake::SharedKeys;
use log::{debug, info, trace, warn};
use nym_config::NymConfig;
use nym_crypto::asymmetric::identity;
use nym_topology::{filter::VersionFilterable, gateway};
use rand::{seq::SliceRandom, thread_rng};
use rand::{seq::SliceRandom, thread_rng, Rng};
use std::{sync::Arc, time::Duration};
use tap::TapFallible;
use tungstenite::Message;
use url::Url;
#[cfg(not(target_arch = "wasm32"))]
use tokio::net::TcpStream;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::Instant;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::connect_async;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
#[cfg(not(target_arch = "wasm32"))]
use validator_client::nyxd::SigningNyxdClient;
pub(super) async fn query_gateway_details(
validator_servers: Vec<Url>,
chosen_gateway_id: Option<identity::PublicKey>,
) -> Result<gateway::Node, ClientCoreError> {
let nym_api = validator_servers
.choose(&mut thread_rng())
#[cfg(not(target_arch = "wasm32"))]
type WsConn = WebSocketStream<MaybeTlsStream<TcpStream>>;
#[cfg(target_arch = "wasm32")]
use gateway_client::wasm_mockups::SigningNyxdClient;
#[cfg(target_arch = "wasm32")]
use wasm_timer::Instant;
#[cfg(target_arch = "wasm32")]
use wasm_utils::websocket::JSWebsocket;
#[cfg(target_arch = "wasm32")]
type WsConn = JSWebsocket;
const MEASUREMENTS: usize = 3;
#[cfg(not(target_arch = "wasm32"))]
const CONN_TIMEOUT: Duration = Duration::from_millis(1500);
const PING_TIMEOUT: Duration = Duration::from_millis(1000);
struct GatewayWithLatency {
gateway: gateway::Node,
latency: Duration,
}
impl GatewayWithLatency {
fn new(gateway: gateway::Node, latency: Duration) -> Self {
GatewayWithLatency { gateway, latency }
}
}
async fn current_gateways<R: Rng>(
rng: &mut R,
nym_apis: Vec<Url>,
) -> Result<Vec<gateway::Node>, ClientCoreError> {
let nym_api = nym_apis
.choose(rng)
.ok_or(ClientCoreError::ListOfNymApisIsEmpty)?;
let validator_client = validator_client::client::NymApiClient::new(nym_api.clone());
let client = validator_client::client::NymApiClient::new(nym_api.clone());
log::trace!("Fetching list of gateways from: {}", nym_api);
let gateways = validator_client.get_cached_gateways().await?;
let gateways = client.get_cached_gateways().await?;
let valid_gateways = gateways
.into_iter()
.filter_map(|gateway| gateway.try_into().ok())
.collect::<Vec<gateway::Node>>();
// we were always filtering by version so I'm not removing that 'feature'
let filtered_gateways = valid_gateways.filter_by_version(env!("CARGO_PKG_VERSION"));
Ok(filtered_gateways)
}
// if we have chosen particular gateway - use it, otherwise choose a random one.
// (remember that in active topology all gateways have at least 100 reputation so should
// be working correctly)
if let Some(gateway_id) = chosen_gateway_id {
filtered_gateways
.iter()
.find(|gateway| gateway.identity_key == gateway_id)
.ok_or_else(|| ClientCoreError::NoGatewayWithId(gateway_id.to_string()))
.cloned()
#[cfg(not(target_arch = "wasm32"))]
async fn connect(endpoint: &str) -> Result<WsConn, ClientCoreError> {
match tokio::time::timeout(CONN_TIMEOUT, connect_async(endpoint)).await {
Err(_elapsed) => Err(ClientCoreError::GatewayConnectionTimeout),
Ok(Err(conn_failure)) => Err(conn_failure.into()),
Ok(Ok((stream, _))) => Ok(stream),
}
}
#[cfg(target_arch = "wasm32")]
async fn connect(endpoint: &str) -> Result<WsConn, ClientCoreError> {
JSWebsocket::new(endpoint).map_err(|_| ClientCoreError::GatewayJsConnectionFailure)
}
async fn measure_latency(gateway: gateway::Node) -> Result<GatewayWithLatency, ClientCoreError> {
let addr = gateway.clients_address();
trace!(
"establishing connection to {} ({addr})...",
gateway.identity_key,
);
let mut stream = connect(&addr).await?;
let mut results = Vec::new();
for _ in 0..MEASUREMENTS {
let measurement_future = async {
let ping_content = vec![1, 2, 3];
let start = Instant::now();
stream.send(Message::Ping(ping_content.clone())).await?;
match stream.next().await {
Some(Ok(Message::Pong(content))) => {
if content == ping_content {
let elapsed = Instant::now().duration_since(start);
trace!("current ping time: {elapsed:?}");
results.push(elapsed);
} else {
warn!("received a pong message with different content? wtf.")
}
}
Some(Ok(_)) => warn!("received a message that's not a pong!"),
Some(Err(err)) => return Err(err.into()),
None => return Err(ClientCoreError::GatewayConnectionAbruptlyClosed),
}
Ok::<(), ClientCoreError>(())
};
// thanks to wasm we can't use tokio::time::timeout : (
#[cfg(not(target_arch = "wasm32"))]
let timeout = tokio::time::sleep(PING_TIMEOUT);
#[cfg(not(target_arch = "wasm32"))]
tokio::pin!(timeout);
#[cfg(target_arch = "wasm32")]
let mut timeout = wasm_timer::Delay::new(PING_TIMEOUT);
tokio::select! {
_ = &mut timeout => {
warn!("timed out while trying to perform measurement...")
}
res = measurement_future => res?,
}
}
let count = results.len() as u64;
if count == 0 {
return Err(ClientCoreError::NoGatewayMeasurements {
identity: gateway.identity_key.to_base58_string(),
});
}
let sum: Duration = results.into_iter().sum();
let avg = Duration::from_nanos(sum.as_nanos() as u64 / count);
Ok(GatewayWithLatency::new(gateway, avg))
}
async fn choose_gateway_by_latency<R: Rng>(
rng: &mut R,
gateways: Vec<gateway::Node>,
) -> Result<gateway::Node, ClientCoreError> {
info!("choosing gateway by latency...");
let mut gateways_with_latency = Vec::new();
for gateway in gateways {
let id = *gateway.identity();
trace!("measuring latency to {id}...");
let with_latency = match measure_latency(gateway).await {
Ok(res) => res,
Err(err) => {
warn!("failed to measure {id}: {err}");
continue;
}
};
debug!(
"{id} ({}): {:?}",
with_latency.gateway.location, with_latency.latency
);
gateways_with_latency.push(with_latency)
}
let chosen = gateways_with_latency
.choose_weighted(rng, |item| 1. / item.latency.as_secs_f32())
.expect("invalid selection weight!");
info!(
"chose gateway {} (located at {}) with average latency of {:?}",
chosen.gateway.identity_key, chosen.gateway.location, chosen.latency
);
Ok(chosen.gateway.clone())
}
fn uniformly_random_gateway<R: Rng>(
rng: &mut R,
gateways: Vec<gateway::Node>,
) -> Result<gateway::Node, ClientCoreError> {
gateways
.choose(rng)
.ok_or(ClientCoreError::NoGatewaysOnNetwork)
.cloned()
}
pub(super) async fn query_gateway_details(
validator_servers: Vec<Url>,
chosen_gateway_id: Option<identity::PublicKey>,
by_latency: bool,
) -> Result<gateway::Node, ClientCoreError> {
let mut rng = thread_rng();
let gateways = current_gateways(&mut rng, validator_servers).await?;
// if we set an explicit gateway, use that one and nothing else
if let Some(explicitly_chosen) = chosen_gateway_id {
gateways
.into_iter()
.find(|gateway| gateway.identity_key == explicitly_chosen)
.ok_or_else(|| ClientCoreError::NoGatewayWithId(explicitly_chosen.to_string()))
} else if by_latency {
choose_gateway_by_latency(&mut rng, gateways).await
} else {
filtered_gateways
.choose(&mut rand::thread_rng())
.ok_or(ClientCoreError::NoGatewaysOnNetwork)
.cloned()
uniformly_random_gateway(&mut rng, gateways)
}
}
+10 -4
View File
@@ -77,9 +77,11 @@ pub async fn register_with_gateway(
key_manager: &mut KeyManager,
nym_api_endpoints: Vec<Url>,
chosen_gateway_id: Option<identity::PublicKey>,
by_latency: bool,
) -> Result<GatewayEndpointConfig, ClientCoreError> {
// Get the gateway details of the gateway we will use
let gateway = helpers::query_gateway_details(nym_api_endpoints, chosen_gateway_id).await?;
let gateway =
helpers::query_gateway_details(nym_api_endpoints, chosen_gateway_id, by_latency).await?;
log::debug!("Querying gateway gives: {}", gateway);
let our_identity = key_manager.identity_keypair();
@@ -102,6 +104,7 @@ pub async fn setup_gateway_from_config<C, T>(
register_gateway: bool,
user_chosen_gateway_id: Option<identity::PublicKey>,
config: &Config<T>,
by_latency: bool,
) -> Result<GatewayEndpointConfig, ClientCoreError>
where
C: NymConfig + ClientCoreConfigTrait,
@@ -117,9 +120,12 @@ where
}
// Else, we preceed by querying the nym-api
let gateway =
helpers::query_gateway_details(config.get_nym_api_endpoints(), user_chosen_gateway_id)
.await?;
let gateway = helpers::query_gateway_details(
config.get_nym_api_endpoints(),
user_chosen_gateway_id,
by_latency,
)
.await?;
log::debug!("Querying gateway gives: {}", gateway);
// If we are not registering, just return this and assume the caller has the keys already and
+5 -2
View File
@@ -51,8 +51,11 @@ async fn block_until_coconut_is_available<C: Clone + CosmWasmClient + Send + Syn
break;
} else {
// Use 20 additional seconds to avoid the exact moment of going into the final epoch state
let secs_until_final = epoch.final_timestamp_secs() + 20 - current_timestamp_secs;
// Use 1 additional second to not start the next iteration immediately and spam get_current_epoch queries
let secs_until_final = epoch
.final_timestamp_secs()
.saturating_sub(current_timestamp_secs)
+ 1;
info!("Approximately {} seconds until coconut is available. Sleeping until then. You can safely kill the process at any moment.", secs_until_final);
std::thread::sleep(Duration::from_secs(secs_until_final));
}
+6
View File
@@ -25,6 +25,11 @@ pub(crate) struct Init {
#[clap(long)]
gateway: Option<identity::PublicKey>,
/// Specifies whether the new gateway should be determined based by latency as opposed to being chosen
/// uniformly.
#[clap(long, conflicts_with = "gateway")]
latency_based_selection: bool,
/// Force register gateway. WARNING: this will overwrite any existing keys for the given id,
/// potentially causing loss of access.
#[clap(long)]
@@ -143,6 +148,7 @@ pub(crate) async fn execute(args: &Init) -> Result<(), ClientError> {
register_gateway,
user_chosen_gateway_id,
config.get_base(),
args.latency_based_selection,
)
.await
.tap_err(|err| eprintln!("Failed to setup gateway\nError: {err}"))?;
+6
View File
@@ -37,6 +37,11 @@ pub(crate) struct Init {
#[clap(long)]
gateway: Option<identity::PublicKey>,
/// Specifies whether the new gateway should be determined based by latency as opposed to being chosen
/// uniformly.
#[clap(long, conflicts_with = "gateway")]
latency_based_selection: bool,
/// Force register gateway. WARNING: this will overwrite any existing keys for the given id,
/// potentially causing loss of access.
#[clap(long)]
@@ -149,6 +154,7 @@ pub(crate) async fn execute(args: &Init) -> Result<(), Socks5ClientError> {
register_gateway,
user_chosen_gateway_id,
config.get_base(),
args.latency_based_selection,
)
.await
.tap_err(|err| eprintln!("Failed to setup gateway\nError: {err}"))?;
@@ -11,9 +11,7 @@ use nym_api_requests::models::{
GatewayCoreStatusResponse, MixnodeCoreStatusResponse, MixnodeStatusResponse,
RewardEstimationResponse, StakeSaturationResponse,
};
use nym_mixnet_contract_common::mixnode::MixNodeDetails;
use nym_mixnet_contract_common::MixId;
use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef};
pub use nym_mixnet_contract_common::{mixnode::MixNodeDetails, GatewayBond, IdentityKeyRef, MixId};
#[cfg(feature = "nyxd-client")]
use crate::nyxd::traits::{DkgQueryClient, MixnetQueryClient, MultisigQueryClient};
@@ -80,7 +80,7 @@ pub(crate) mod tests {
use crate::epoch_state::transactions::advance_epoch_state;
use crate::support::tests::fixtures::dealer_details_fixture;
use crate::support::tests::helpers;
use crate::support::tests::helpers::GROUP_MEMBERS;
use crate::support::tests::helpers::{add_fixture_dealer, GROUP_MEMBERS};
use coconut_dkg_common::types::{InitialReplacementData, TimeConfiguration};
use cosmwasm_std::testing::{mock_env, mock_info};
use cw4::Member;
@@ -147,6 +147,8 @@ pub(crate) mod tests {
.block
.time
.plus_seconds(TimeConfiguration::default().public_key_submission_time_secs);
add_fixture_dealer(deps.as_mut());
advance_epoch_state(deps.as_mut(), env).unwrap();
let ret = try_add_dealer(
@@ -53,6 +53,7 @@ pub(crate) mod tests {
use crate::epoch_state::transactions::advance_epoch_state;
use crate::support::tests::fixtures::{dealer_details_fixture, dealing_bytes_fixture};
use crate::support::tests::helpers;
use crate::support::tests::helpers::add_fixture_dealer;
use coconut_dkg_common::dealer::DealerDetails;
use coconut_dkg_common::types::{InitialReplacementData, TimeConfiguration};
use cosmwasm_std::testing::{mock_env, mock_info};
@@ -80,6 +81,7 @@ pub(crate) mod tests {
.block
.time
.plus_seconds(TimeConfiguration::default().public_key_submission_time_secs);
add_fixture_dealer(deps.as_mut());
advance_epoch_state(deps.as_mut(), env).unwrap();
let ret = try_commit_dealings(deps.as_mut(), info.clone(), dealing_bytes.clone(), false)
@@ -7,6 +7,7 @@ use crate::epoch_state::storage::{CURRENT_EPOCH, INITIAL_REPLACEMENT_DATA, THRES
use crate::epoch_state::utils::check_epoch_state;
use crate::error::ContractError;
use crate::state::STATE;
use crate::verification_key_shares::storage::vk_shares;
use coconut_dkg_common::types::{Epoch, EpochState, InitialReplacementData};
use cosmwasm_std::{Addr, Deps, DepsMut, Env, Order, Response, Storage};
@@ -89,23 +90,29 @@ pub(crate) fn advance_epoch_state(deps: DepsMut<'_>, env: Env) -> Result<Respons
let current_epoch = CURRENT_EPOCH.load(deps.storage)?;
let next_epoch = if let Some(state) = current_epoch.state.next() {
// We are during DKG process
let mut new_state = state;
if let EpochState::DealingExchange { resharing } = state {
let current_dealers = current_dealers()
.keys(deps.storage, None, None, Order::Ascending)
.collect::<Result<Vec<Addr>, _>>()?;
// note: ceiling in integer division can be achieved via q = (x + y - 1) / y;
let threshold = (2 * current_dealers.len() as u64 + 3 - 1) / 3;
THRESHOLD.save(deps.storage, &threshold)?;
if !resharing {
let replacement_data = InitialReplacementData {
initial_dealers: current_dealers,
initial_height: None,
};
INITIAL_REPLACEMENT_DATA.save(deps.storage, &replacement_data)?;
if current_dealers.is_empty() {
// If no dealer registered yet, we just stay in the same state until there's at least one
new_state = current_epoch.state;
} else {
// note: ceiling in integer division can be achieved via q = (x + y - 1) / y;
let threshold = (2 * current_dealers.len() as u64 + 3 - 1) / 3;
THRESHOLD.save(deps.storage, &threshold)?;
if !resharing {
let replacement_data = InitialReplacementData {
initial_dealers: current_dealers,
initial_height: None,
};
INITIAL_REPLACEMENT_DATA.save(deps.storage, &replacement_data)?;
}
}
}
};
Epoch::new(
state,
new_state,
current_epoch.epoch_id,
current_epoch.time_configuration,
env.block.time,
@@ -152,9 +159,16 @@ pub(crate) fn try_surpassed_threshold(
check_epoch_state(deps.storage, EpochState::InProgress)?;
let threshold = THRESHOLD.load(deps.storage)?;
let dealers = current_dealers()
.keys(deps.storage, None, None, Order::Ascending)
.flatten();
let dealers = vk_shares()
.range(deps.storage, None, None, Order::Ascending)
.flatten()
.filter_map(|(_, share)| {
if share.verified {
Some(share.owner)
} else {
None
}
});
if dealers_still_active(&deps.as_ref(), dealers)? < threshold as usize {
reset_epoch_state(deps.storage)?;
CURRENT_EPOCH.update::<_, ContractError>(deps.storage, |epoch| {
@@ -174,7 +188,7 @@ pub(crate) fn try_surpassed_threshold(
pub(crate) mod tests {
use super::*;
use crate::error::ContractError::EarlyEpochStateAdvancement;
use crate::support::tests::fixtures::dealer_details_fixture;
use crate::support::tests::fixtures::{dealer_details_fixture, vk_share_fixture};
use crate::support::tests::helpers::{init_contract, GROUP_MEMBERS};
use coconut_dkg_common::types::{
ContractSafeBytes, DealerDetails, EpochState, TimeConfiguration,
@@ -392,6 +406,14 @@ pub(crate) mod tests {
EarlyEpochStateAdvancement(1)
);
env.block.time = env.block.time.plus_seconds(1);
advance_epoch_state(deps.as_mut(), env.clone()).unwrap();
let epoch = CURRENT_EPOCH.load(deps.as_mut().storage).unwrap();
assert_eq!(
epoch.state,
EpochState::PublicKeySubmission { resharing: false }
);
// setup dealer details
let all_details: [_; 4] = std::array::from_fn(|i| dealer_details_fixture(i as u64 + 1));
for details in all_details.iter() {
@@ -404,7 +426,7 @@ pub(crate) mod tests {
.may_load(&deps.storage)
.unwrap()
.is_none());
env.block.time = env.block.time.plus_seconds(1);
env.block.time = env.block.time.plus_seconds(epoch.time_configuration.public_key_submission_time_secs);
advance_epoch_state(deps.as_mut(), env.clone()).unwrap();
let epoch = CURRENT_EPOCH.load(deps.as_mut().storage).unwrap();
assert_eq!(
@@ -664,6 +686,12 @@ pub(crate) mod tests {
.save(deps.as_mut().storage, &details.address, details)
.unwrap();
}
let all_shares: [_; 3] = std::array::from_fn(|i| vk_share_fixture(&format!("owner{}", i + 1), 0));
for share in all_shares.iter() {
vk_shares()
.save(deps.as_mut().storage, (&share.owner, share.epoch_id), share)
.unwrap();
}
for times in [
time_configuration.public_key_submission_time_secs,
@@ -2,11 +2,13 @@
// SPDX-License-Identifier: Apache-2.0
use crate::contract::instantiate;
use crate::dealers::storage::current_dealers;
use coconut_dkg_common::msg::InstantiateMsg;
use coconut_dkg_common::types::DealerDetails;
use cosmwasm_std::testing::{mock_dependencies, mock_env, mock_info, MockApi, MockQuerier};
use cosmwasm_std::{
from_binary, to_binary, ContractResult, Empty, MemoryStorage, OwnedDeps, QuerierResult,
SystemResult, WasmQuery,
from_binary, to_binary, Addr, ContractResult, DepsMut, Empty, MemoryStorage, OwnedDeps,
QuerierResult, SystemResult, WasmQuery,
};
use cw4::{Cw4QueryMsg, Member, MemberListResponse, MemberResponse};
use lazy_static::lazy_static;
@@ -22,6 +24,22 @@ lazy_static! {
pub static ref GROUP_MEMBERS: Mutex<Vec<(Member, u64)>> = Mutex::new(vec![]);
}
pub fn add_fixture_dealer(deps: DepsMut<'_>) {
let owner = Addr::unchecked("owner");
current_dealers()
.save(
deps.storage,
&owner,
&DealerDetails {
address: owner.clone(),
bte_public_key_with_proof: String::new(),
announce_address: String::new(),
assigned_index: 100,
},
)
.unwrap();
}
fn querier_handler(query: &WasmQuery) -> QuerierResult {
let bin = match query {
WasmQuery::Smart { contract_addr, msg } => {
@@ -91,7 +91,7 @@ mod tests {
use super::*;
use crate::epoch_state::transactions::advance_epoch_state;
use crate::support::tests::helpers;
use crate::support::tests::helpers::MULTISIG_CONTRACT;
use crate::support::tests::helpers::{add_fixture_dealer, MULTISIG_CONTRACT};
use coconut_dkg_common::dealer::DealerDetails;
use coconut_dkg_common::types::{EpochState, TimeConfiguration};
use cosmwasm_std::testing::{mock_env, mock_info};
@@ -104,6 +104,7 @@ mod tests {
let info = mock_info("requester", &[]);
let share = "share".to_string();
add_fixture_dealer(deps.as_mut());
env.block.time = env
.block
.time
@@ -171,6 +172,7 @@ mod tests {
.to_string()
}
);
add_fixture_dealer(deps.as_mut());
env.block.time = env
.block
.time
@@ -247,6 +249,7 @@ mod tests {
}
);
add_fixture_dealer(deps.as_mut());
env.block.time = env
.block
.time
@@ -292,6 +295,7 @@ mod tests {
let share = "share".to_string();
let multisig_info = mock_info(MULTISIG_CONTRACT, &[]);
add_fixture_dealer(deps.as_mut());
env.block.time = env
.block
.time
+58 -49
View File
@@ -99,57 +99,66 @@ impl<R: RngCore + Clone> DkgController<R> {
return;
}
if let Err(err) = self.state.is_consistent(epoch.state).await {
error!("Epoch state is corrupted - {err}, the process should be terminated");
return;
}
let ret = match epoch.state {
EpochState::PublicKeySubmission { resharing } => {
public_key_submission(&self.dkg_client, &mut self.state, resharing).await
}
EpochState::DealingExchange { resharing } => {
dealing_exchange(
&self.dkg_client,
&mut self.state,
self.rng.clone(),
resharing,
)
.await
}
EpochState::VerificationKeySubmission { resharing } => {
let keypair_path = nym_pemstore::KeyPairPath::new(
self.secret_key_path.clone(),
self.verification_key_path.clone(),
);
verification_key_submission(
&self.dkg_client,
&mut self.state,
&keypair_path,
resharing,
)
.await
}
EpochState::VerificationKeyValidation { resharing } => {
verification_key_validation(&self.dkg_client, &mut self.state, resharing)
debug!("Epoch state is corrupted - {err}. Awaiting for a DKG restart.");
} else {
let ret = match epoch.state {
EpochState::PublicKeySubmission { resharing } => {
public_key_submission(&self.dkg_client, &mut self.state, resharing)
.await
}
EpochState::DealingExchange { resharing } => {
dealing_exchange(
&self.dkg_client,
&mut self.state,
self.rng.clone(),
resharing,
)
.await
}
EpochState::VerificationKeyFinalization { resharing } => {
verification_key_finalization(&self.dkg_client, &mut self.state, resharing)
}
EpochState::VerificationKeySubmission { resharing } => {
let keypair_path = nym_pemstore::KeyPairPath::new(
self.secret_key_path.clone(),
self.verification_key_path.clone(),
);
verification_key_submission(
&self.dkg_client,
&mut self.state,
&keypair_path,
resharing,
)
.await
}
// Just wait, in case we need to redo dkg at some point
EpochState::InProgress => {
self.state.set_was_in_progress();
Ok(())
}
};
if let Err(err) = ret {
warn!("Could not handle this iteration for the epoch state: {err}");
} else if epoch.state != EpochState::InProgress {
let persistent_state = PersistentState::from(&self.state);
if let Err(err) =
persistent_state.save_to_file(self.state.persistent_state_path())
{
warn!("Could not backup the state for this iteration: {err}");
}
EpochState::VerificationKeyValidation { resharing } => {
verification_key_validation(
&self.dkg_client,
&mut self.state,
resharing,
)
.await
}
EpochState::VerificationKeyFinalization { resharing } => {
verification_key_finalization(
&self.dkg_client,
&mut self.state,
resharing,
)
.await
}
// Just wait, in case we need to redo dkg at some point
EpochState::InProgress => {
self.state.set_was_in_progress();
Ok(())
}
};
if let Err(err) = ret {
warn!("Could not handle this iteration for the epoch state: {err}");
} else if epoch.state != EpochState::InProgress {
let persistent_state = PersistentState::from(&self.state);
if let Err(err) =
persistent_state.save_to_file(self.state.persistent_state_path())
{
warn!("Could not backup the state for this iteration: {err}");
}
}
}
if let Ok(current_timestamp) =
+1 -1
View File
@@ -133,7 +133,7 @@ impl ConsistentState for State {
fn proposal_id_value(&self) -> Result<u64, CoconutError> {
self.proposal_id.ok_or(CoconutError::UnrecoverableState {
reason: String::from("Proposal id should have benn set"),
reason: String::from("Proposal id should have been set"),
})
}
}
+1 -1
View File
@@ -91,7 +91,7 @@ pub enum CoconutError {
#[error("Failed to recover assigned node index: {reason}")]
NodeIndexRecoveryError { reason: String },
#[error("Unrecoverable state: {reason}. Process should be restarted")]
#[error("Unrecoverable state: {reason}")]
UnrecoverableState { reason: String },
#[error("DKG has not finished yet in order to derive the coconut key")]
@@ -138,6 +138,8 @@ pub async fn init_socks5_config(provider_address: String, chosen_gateway_id: Str
register_gateway,
Some(chosen_gateway_id),
config.get_base(),
// TODO: another instance where this setting should probably get used
false,
)
.await?;
@@ -118,6 +118,7 @@ pub async fn init_socks5_config(
&mut key_manager,
nym_api_endpoints,
Some(chosen_gateway_id),
false,
)
.await?;
+2
View File
@@ -259,6 +259,8 @@ where
&mut self.key_manager,
self.config.nym_api_endpoints.clone(),
user_chosen_gateway,
// TODO: this should probably be configurable with the config
false,
)
.await?;
@@ -24,6 +24,11 @@ pub(crate) struct Init {
#[clap(long)]
gateway: Option<identity::PublicKey>,
/// Specifies whether the new gateway should be determined based by latency as opposed to being chosen
/// uniformly.
#[clap(long, conflicts_with = "gateway")]
latency_based_selection: bool,
/// Force register gateway. WARNING: this will overwrite any existing keys for the given id,
/// potentially causing loss of access.
#[clap(long)]
@@ -115,6 +120,7 @@ pub(crate) async fn execute(args: &Init) -> Result<(), NetworkRequesterError> {
register_gateway,
user_chosen_gateway_id,
config.get_base(),
args.latency_based_selection,
)
.await
.map_err(|source| {