Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d6a81d9213 | |||
| a6db5fe704 | |||
| fe57d08f3e | |||
| ae29b2300c | |||
| 7b98d62f96 | |||
| 6abe95ed61 |
Generated
+6
-2
@@ -1063,6 +1063,8 @@ dependencies = [
|
|||||||
"pemstore",
|
"pemstore",
|
||||||
"rand 0.7.3",
|
"rand 0.7.3",
|
||||||
"rand_chacha 0.2.2",
|
"rand_chacha 0.2.2",
|
||||||
|
"serde",
|
||||||
|
"serde_bytes",
|
||||||
"subtle-encoding",
|
"subtle-encoding",
|
||||||
"x25519-dalek",
|
"x25519-dalek",
|
||||||
]
|
]
|
||||||
@@ -1427,6 +1429,7 @@ version = "1.4.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "3d5c4b5e5959dc2c2b89918d8e2cc40fcdd623cef026ed09d2f0ee05199dc8e4"
|
checksum = "3d5c4b5e5959dc2c2b89918d8e2cc40fcdd623cef026ed09d2f0ee05199dc8e4"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"serde",
|
||||||
"signature",
|
"signature",
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -1440,6 +1443,7 @@ dependencies = [
|
|||||||
"ed25519",
|
"ed25519",
|
||||||
"rand 0.7.3",
|
"rand 0.7.3",
|
||||||
"serde",
|
"serde",
|
||||||
|
"serde_bytes",
|
||||||
"sha2",
|
"sha2",
|
||||||
"zeroize",
|
"zeroize",
|
||||||
]
|
]
|
||||||
@@ -4834,9 +4838,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde_bytes"
|
name = "serde_bytes"
|
||||||
version = "0.11.5"
|
version = "0.11.6"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "16ae07dd2f88a366f15bd0632ba725227018c69a1c8550a927324f8eb8368bb9"
|
checksum = "212e73464ebcde48d723aa02eb270ba62eff38a9b732df31f33f1b4e145f3a54"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -324,6 +324,7 @@ pub trait SigningCosmWasmClient: CosmWasmClient {
|
|||||||
|
|
||||||
Ok(ExecuteResult {
|
Ok(ExecuteResult {
|
||||||
logs: parse_raw_logs(tx_res.tx_result.log)?,
|
logs: parse_raw_logs(tx_res.tx_result.log)?,
|
||||||
|
data: tx_res.tx_result.data,
|
||||||
transaction_hash: tx_res.hash,
|
transaction_hash: tx_res.hash,
|
||||||
gas_info,
|
gas_info,
|
||||||
})
|
})
|
||||||
@@ -364,6 +365,7 @@ pub trait SigningCosmWasmClient: CosmWasmClient {
|
|||||||
|
|
||||||
Ok(ExecuteResult {
|
Ok(ExecuteResult {
|
||||||
logs: parse_raw_logs(tx_res.tx_result.log)?,
|
logs: parse_raw_logs(tx_res.tx_result.log)?,
|
||||||
|
data: tx_res.tx_result.data,
|
||||||
transaction_hash: tx_res.hash,
|
transaction_hash: tx_res.hash,
|
||||||
gas_info,
|
gas_info,
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ use cosmrs::proto::cosmwasm::wasm::v1::{
|
|||||||
CodeInfoResponse, ContractCodeHistoryEntry as ProtoContractCodeHistoryEntry,
|
CodeInfoResponse, ContractCodeHistoryEntry as ProtoContractCodeHistoryEntry,
|
||||||
ContractCodeHistoryOperationType, ContractInfo as ProtoContractInfo,
|
ContractCodeHistoryOperationType, ContractInfo as ProtoContractInfo,
|
||||||
};
|
};
|
||||||
|
use cosmrs::tendermint::abci::Data;
|
||||||
use cosmrs::tendermint::{abci, chain};
|
use cosmrs::tendermint::{abci, chain};
|
||||||
use cosmrs::tx::{AccountNumber, Gas, SequenceNumber};
|
use cosmrs::tx::{AccountNumber, Gas, SequenceNumber};
|
||||||
use cosmrs::{tx, AccountId, Any, Coin};
|
use cosmrs::{tx, AccountId, Any, Coin};
|
||||||
@@ -672,6 +673,8 @@ pub struct MigrateResult {
|
|||||||
pub struct ExecuteResult {
|
pub struct ExecuteResult {
|
||||||
pub logs: Vec<Log>,
|
pub logs: Vec<Log>,
|
||||||
|
|
||||||
|
pub data: Data,
|
||||||
|
|
||||||
/// Transaction hash (might be used as transaction ID)
|
/// Transaction hash (might be used as transaction ID)
|
||||||
pub transaction_hash: tx::Hash,
|
pub transaction_hash: tx::Hash,
|
||||||
|
|
||||||
|
|||||||
@@ -19,6 +19,8 @@ cipher = { version = "0.4.3", optional = true }
|
|||||||
x25519-dalek = { version = "1.1", optional = true }
|
x25519-dalek = { version = "1.1", optional = true }
|
||||||
ed25519-dalek = { version = "1.0", optional = true }
|
ed25519-dalek = { version = "1.0", optional = true }
|
||||||
rand = { version = "0.7.3", features = ["wasm-bindgen"], optional = true }
|
rand = { version = "0.7.3", features = ["wasm-bindgen"], optional = true }
|
||||||
|
serde_bytes = { version = "0.11.6", optional = true }
|
||||||
|
serde_crate = { version = "1.0", optional = true, default_features = false, package = "serde" }
|
||||||
subtle-encoding = { version = "0.5", features = ["bech32-preview"]}
|
subtle-encoding = { version = "0.5", features = ["bech32-preview"]}
|
||||||
|
|
||||||
# internal
|
# internal
|
||||||
@@ -30,6 +32,7 @@ config = { path="../../common/config" }
|
|||||||
rand_chacha = "0.2"
|
rand_chacha = "0.2"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
|
serde = ["serde_crate", "serde_bytes", "ed25519-dalek/serde", "x25519-dalek/serde"]
|
||||||
asymmetric = ["x25519-dalek", "ed25519-dalek"]
|
asymmetric = ["x25519-dalek", "ed25519-dalek"]
|
||||||
hashing = ["blake3", "digest", "hkdf", "hmac", "generic-array"]
|
hashing = ["blake3", "digest", "hkdf", "hmac", "generic-array"]
|
||||||
symmetric = ["aes", "ctr", "cipher", "generic-array"]
|
symmetric = ["aes", "ctr", "cipher", "generic-array"]
|
||||||
|
|||||||
@@ -4,6 +4,8 @@
|
|||||||
use pemstore::traits::{PemStorableKey, PemStorableKeyPair};
|
use pemstore::traits::{PemStorableKey, PemStorableKeyPair};
|
||||||
#[cfg(feature = "rand")]
|
#[cfg(feature = "rand")]
|
||||||
use rand::{CryptoRng, RngCore};
|
use rand::{CryptoRng, RngCore};
|
||||||
|
#[cfg(feature = "serde")]
|
||||||
|
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||||
use std::fmt::{self, Display, Formatter};
|
use std::fmt::{self, Display, Formatter};
|
||||||
|
|
||||||
/// Size of a X25519 private key
|
/// Size of a X25519 private key
|
||||||
@@ -127,6 +129,28 @@ impl PublicKey {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "serde")]
|
||||||
|
impl Serialize for PublicKey {
|
||||||
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
self.0.serialize(serializer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "serde")]
|
||||||
|
impl<'d> Deserialize<'d> for PublicKey {
|
||||||
|
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||||
|
where
|
||||||
|
D: Deserializer<'d>,
|
||||||
|
{
|
||||||
|
Ok(PublicKey(x25519_dalek::PublicKey::deserialize(
|
||||||
|
deserializer,
|
||||||
|
)?))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl PemStorableKey for PublicKey {
|
impl PemStorableKey for PublicKey {
|
||||||
type Error = KeyRecoveryError;
|
type Error = KeyRecoveryError;
|
||||||
|
|
||||||
@@ -143,7 +167,6 @@ impl PemStorableKey for PublicKey {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct PrivateKey(x25519_dalek::StaticSecret);
|
pub struct PrivateKey(x25519_dalek::StaticSecret);
|
||||||
|
|
||||||
impl Display for PrivateKey {
|
impl Display for PrivateKey {
|
||||||
@@ -187,6 +210,28 @@ impl PrivateKey {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "serde")]
|
||||||
|
impl Serialize for PrivateKey {
|
||||||
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
self.0.serialize(serializer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "serde")]
|
||||||
|
impl<'d> Deserialize<'d> for PrivateKey {
|
||||||
|
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||||
|
where
|
||||||
|
D: Deserializer<'d>,
|
||||||
|
{
|
||||||
|
Ok(PrivateKey(x25519_dalek::StaticSecret::deserialize(
|
||||||
|
deserializer,
|
||||||
|
)?))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl PemStorableKey for PrivateKey {
|
impl PemStorableKey for PrivateKey {
|
||||||
type Error = KeyRecoveryError;
|
type Error = KeyRecoveryError;
|
||||||
|
|
||||||
|
|||||||
@@ -10,6 +10,13 @@ use pemstore::traits::{PemStorableKey, PemStorableKeyPair};
|
|||||||
use rand::{CryptoRng, RngCore};
|
use rand::{CryptoRng, RngCore};
|
||||||
use std::fmt::{self, Display, Formatter};
|
use std::fmt::{self, Display, Formatter};
|
||||||
|
|
||||||
|
#[cfg(feature = "serde")]
|
||||||
|
use serde::de::Error as SerdeError;
|
||||||
|
#[cfg(feature = "serde")]
|
||||||
|
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||||
|
#[cfg(feature = "serde")]
|
||||||
|
use serde_bytes::{ByteBuf as SerdeByteBuf, Bytes as SerdeBytes};
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum Ed25519RecoveryError {
|
pub enum Ed25519RecoveryError {
|
||||||
MalformedBytes(SignatureError),
|
MalformedBytes(SignatureError),
|
||||||
@@ -40,6 +47,7 @@ impl fmt::Display for Ed25519RecoveryError {
|
|||||||
impl std::error::Error for Ed25519RecoveryError {}
|
impl std::error::Error for Ed25519RecoveryError {}
|
||||||
|
|
||||||
/// Keypair for usage in ed25519 EdDSA.
|
/// Keypair for usage in ed25519 EdDSA.
|
||||||
|
#[derive(Debug)]
|
||||||
pub struct KeyPair {
|
pub struct KeyPair {
|
||||||
private_key: PrivateKey,
|
private_key: PrivateKey,
|
||||||
public_key: PublicKey,
|
public_key: PublicKey,
|
||||||
@@ -135,6 +143,28 @@ impl PublicKey {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "serde")]
|
||||||
|
impl Serialize for PublicKey {
|
||||||
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
self.0.serialize(serializer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "serde")]
|
||||||
|
impl<'d> Deserialize<'d> for PublicKey {
|
||||||
|
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||||
|
where
|
||||||
|
D: Deserializer<'d>,
|
||||||
|
{
|
||||||
|
Ok(PublicKey(ed25519_dalek::PublicKey::deserialize(
|
||||||
|
deserializer,
|
||||||
|
)?))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl PemStorableKey for PublicKey {
|
impl PemStorableKey for PublicKey {
|
||||||
type Error = Ed25519RecoveryError;
|
type Error = Ed25519RecoveryError;
|
||||||
|
|
||||||
@@ -200,6 +230,28 @@ impl PrivateKey {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "serde")]
|
||||||
|
impl Serialize for PrivateKey {
|
||||||
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
self.0.serialize(serializer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "serde")]
|
||||||
|
impl<'d> Deserialize<'d> for PrivateKey {
|
||||||
|
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||||
|
where
|
||||||
|
D: Deserializer<'d>,
|
||||||
|
{
|
||||||
|
Ok(PrivateKey(ed25519_dalek::SecretKey::deserialize(
|
||||||
|
deserializer,
|
||||||
|
)?))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl PemStorableKey for PrivateKey {
|
impl PemStorableKey for PrivateKey {
|
||||||
type Error = Ed25519RecoveryError;
|
type Error = Ed25519RecoveryError;
|
||||||
|
|
||||||
@@ -216,7 +268,7 @@ impl PemStorableKey for PrivateKey {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Copy, Clone, Debug)]
|
||||||
pub struct Signature(ed25519_dalek::Signature);
|
pub struct Signature(ed25519_dalek::Signature);
|
||||||
|
|
||||||
impl Signature {
|
impl Signature {
|
||||||
@@ -237,3 +289,24 @@ impl Signature {
|
|||||||
Ok(Signature(ed25519_dalek::Signature::from_bytes(bytes)?))
|
Ok(Signature(ed25519_dalek::Signature::from_bytes(bytes)?))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "serde")]
|
||||||
|
impl Serialize for Signature {
|
||||||
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
SerdeBytes::new(&self.to_bytes()).serialize(serializer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "serde")]
|
||||||
|
impl<'d> Deserialize<'d> for Signature {
|
||||||
|
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||||
|
where
|
||||||
|
D: Deserializer<'d>,
|
||||||
|
{
|
||||||
|
let bytes = <SerdeByteBuf>::deserialize(deserializer)?;
|
||||||
|
Signature::from_bytes(bytes.as_ref()).map_err(SerdeError::custom)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -29,5 +29,5 @@ pub use blake3;
|
|||||||
#[cfg(feature = "symmetric")]
|
#[cfg(feature = "symmetric")]
|
||||||
pub use ctr;
|
pub use ctr;
|
||||||
|
|
||||||
// TODO: this function uses all three modules: asymmetric crypto, symmetric crypto and derives key...,
|
#[cfg(feature = "serde")]
|
||||||
// so I don't know where to put it...
|
extern crate serde_crate as serde;
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ cfg_if::cfg_if! {
|
|||||||
if #[cfg(network = "mainnet")] {
|
if #[cfg(network = "mainnet")] {
|
||||||
pub const DEFAULT_NETWORK: all::Network = all::Network::MAINNET;
|
pub const DEFAULT_NETWORK: all::Network = all::Network::MAINNET;
|
||||||
pub const DENOM: &str = mainnet::DENOM;
|
pub const DENOM: &str = mainnet::DENOM;
|
||||||
|
pub const STAKE_DENOM: &str = mainnet::STAKE_DENOM;
|
||||||
|
|
||||||
pub const ETH_CONTRACT_ADDRESS: [u8; 20] = mainnet::_ETH_CONTRACT_ADDRESS;
|
pub const ETH_CONTRACT_ADDRESS: [u8; 20] = mainnet::_ETH_CONTRACT_ADDRESS;
|
||||||
pub const ETH_ERC20_CONTRACT_ADDRESS: [u8; 20] = mainnet::_ETH_ERC20_CONTRACT_ADDRESS;
|
pub const ETH_ERC20_CONTRACT_ADDRESS: [u8; 20] = mainnet::_ETH_ERC20_CONTRACT_ADDRESS;
|
||||||
@@ -25,6 +26,7 @@ cfg_if::cfg_if! {
|
|||||||
} else if #[cfg(network = "qa")] {
|
} else if #[cfg(network = "qa")] {
|
||||||
pub const DEFAULT_NETWORK: all::Network = all::Network::QA;
|
pub const DEFAULT_NETWORK: all::Network = all::Network::QA;
|
||||||
pub const DENOM: &str = qa::DENOM;
|
pub const DENOM: &str = qa::DENOM;
|
||||||
|
pub const STAKE_DENOM: &str = qa::STAKE_DENOM;
|
||||||
|
|
||||||
pub const ETH_CONTRACT_ADDRESS: [u8; 20] = qa::_ETH_CONTRACT_ADDRESS;
|
pub const ETH_CONTRACT_ADDRESS: [u8; 20] = qa::_ETH_CONTRACT_ADDRESS;
|
||||||
pub const ETH_ERC20_CONTRACT_ADDRESS: [u8; 20] = qa::_ETH_ERC20_CONTRACT_ADDRESS;
|
pub const ETH_ERC20_CONTRACT_ADDRESS: [u8; 20] = qa::_ETH_ERC20_CONTRACT_ADDRESS;
|
||||||
@@ -32,6 +34,7 @@ cfg_if::cfg_if! {
|
|||||||
} else if #[cfg(network = "sandbox")] {
|
} else if #[cfg(network = "sandbox")] {
|
||||||
pub const DEFAULT_NETWORK: all::Network = all::Network::SANDBOX;
|
pub const DEFAULT_NETWORK: all::Network = all::Network::SANDBOX;
|
||||||
pub const DENOM: &str = sandbox::DENOM;
|
pub const DENOM: &str = sandbox::DENOM;
|
||||||
|
pub const STAKE_DENOM: &str = sandbox::STAKE_DENOM;
|
||||||
|
|
||||||
pub const ETH_CONTRACT_ADDRESS: [u8; 20] = sandbox::_ETH_CONTRACT_ADDRESS;
|
pub const ETH_CONTRACT_ADDRESS: [u8; 20] = sandbox::_ETH_CONTRACT_ADDRESS;
|
||||||
pub const ETH_ERC20_CONTRACT_ADDRESS: [u8; 20] = sandbox::_ETH_ERC20_CONTRACT_ADDRESS;
|
pub const ETH_ERC20_CONTRACT_ADDRESS: [u8; 20] = sandbox::_ETH_ERC20_CONTRACT_ADDRESS;
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ use crate::ValidatorDetails;
|
|||||||
|
|
||||||
pub(crate) const BECH32_PREFIX: &str = "n";
|
pub(crate) const BECH32_PREFIX: &str = "n";
|
||||||
pub const DENOM: &str = "unym";
|
pub const DENOM: &str = "unym";
|
||||||
|
pub const STAKE_DENOM: &str = "unyx";
|
||||||
|
|
||||||
pub(crate) const MIXNET_CONTRACT_ADDRESS: &str =
|
pub(crate) const MIXNET_CONTRACT_ADDRESS: &str =
|
||||||
"n14hj2tavq8fpesdwxxcu44rty3hh90vhujrvcmstl4zr3txmfvw9sjyvg3g";
|
"n14hj2tavq8fpesdwxxcu44rty3hh90vhujrvcmstl4zr3txmfvw9sjyvg3g";
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ use crate::ValidatorDetails;
|
|||||||
|
|
||||||
pub(crate) const BECH32_PREFIX: &str = "n";
|
pub(crate) const BECH32_PREFIX: &str = "n";
|
||||||
pub const DENOM: &str = "unym";
|
pub const DENOM: &str = "unym";
|
||||||
|
pub const STAKE_DENOM: &str = "unyx";
|
||||||
|
|
||||||
pub(crate) const MIXNET_CONTRACT_ADDRESS: &str =
|
pub(crate) const MIXNET_CONTRACT_ADDRESS: &str =
|
||||||
"n1suhgf5svhu4usrurvxzlgn54ksxmn8gljarjtxqnapv8kjnp4nrsd3qaep";
|
"n1suhgf5svhu4usrurvxzlgn54ksxmn8gljarjtxqnapv8kjnp4nrsd3qaep";
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ use crate::ValidatorDetails;
|
|||||||
|
|
||||||
pub(crate) const BECH32_PREFIX: &str = "nymt";
|
pub(crate) const BECH32_PREFIX: &str = "nymt";
|
||||||
pub const DENOM: &str = "unymt";
|
pub const DENOM: &str = "unymt";
|
||||||
|
pub const STAKE_DENOM: &str = "unyxt";
|
||||||
|
|
||||||
pub(crate) const MIXNET_CONTRACT_ADDRESS: &str = "nymt1ghd753shjuwexxywmgs4xz7x2q732vcnstz02j";
|
pub(crate) const MIXNET_CONTRACT_ADDRESS: &str = "nymt1ghd753shjuwexxywmgs4xz7x2q732vcnstz02j";
|
||||||
pub(crate) const VESTING_CONTRACT_ADDRESS: &str = "nymt14ejqjyq8um4p3xfqj74yld5waqljf88fn549lh";
|
pub(crate) const VESTING_CONTRACT_ADDRESS: &str = "nymt14ejqjyq8um4p3xfqj74yld5waqljf88fn549lh";
|
||||||
|
|||||||
@@ -45,6 +45,9 @@ mod rewarded_set_updater;
|
|||||||
pub(crate) mod storage;
|
pub(crate) mod storage;
|
||||||
mod swagger;
|
mod swagger;
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
|
mod networking;
|
||||||
|
|
||||||
#[cfg(feature = "coconut")]
|
#[cfg(feature = "coconut")]
|
||||||
mod coconut;
|
mod coconut;
|
||||||
|
|
||||||
@@ -590,6 +593,8 @@ async fn run_validator_api(matches: ArgMatches<'static>) -> Result<()> {
|
|||||||
async fn main() -> Result<()> {
|
async fn main() -> Result<()> {
|
||||||
println!("Starting validator api...");
|
println!("Starting validator api...");
|
||||||
|
|
||||||
|
dotenv::dotenv()?;
|
||||||
|
|
||||||
cfg_if::cfg_if! {if #[cfg(feature = "console-subscriber")] {
|
cfg_if::cfg_if! {if #[cfg(feature = "console-subscriber")] {
|
||||||
// instriment tokio console subscriber needs RUSTFLAGS="--cfg tokio_unstable" at build time
|
// instriment tokio console subscriber needs RUSTFLAGS="--cfg tokio_unstable" at build time
|
||||||
console_subscriber::init();
|
console_subscriber::init();
|
||||||
|
|||||||
@@ -0,0 +1,65 @@
|
|||||||
|
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
use crate::networking::error::NetworkingError;
|
||||||
|
use crate::networking::message::{Header, OffchainMessage};
|
||||||
|
use crate::networking::PROTOCOL_VERSION;
|
||||||
|
use bytes::{Buf, BytesMut};
|
||||||
|
use tokio_util::codec::{Decoder, Encoder};
|
||||||
|
|
||||||
|
// TODO: that was fine for the purposes of DKG, we might want to increase it if it's used anywhere else
|
||||||
|
const MAX_ALLOWED_MESSAGE_LEN: u64 = 2 * 1024 * 1024;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct OffchainCodec;
|
||||||
|
|
||||||
|
impl<'a> Encoder<&'a OffchainMessage> for OffchainCodec {
|
||||||
|
type Error = NetworkingError;
|
||||||
|
|
||||||
|
fn encode(&mut self, item: &OffchainMessage, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
||||||
|
item.encode(dst)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Decoder for OffchainCodec {
|
||||||
|
type Item = OffchainMessage;
|
||||||
|
type Error = NetworkingError;
|
||||||
|
|
||||||
|
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||||
|
if src.len() < Header::LEN {
|
||||||
|
// can't do much without being able to deserialize the header
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
// header deserialization is simple enough to not be too cumbersome if it were to be repeated
|
||||||
|
let header = Header::try_from_bytes(&src[..Header::LEN])?;
|
||||||
|
|
||||||
|
if header.payload_length > MAX_ALLOWED_MESSAGE_LEN {
|
||||||
|
return Err(NetworkingError::MessageTooLarge {
|
||||||
|
supported: MAX_ALLOWED_MESSAGE_LEN,
|
||||||
|
received: header.payload_length,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if header.protocol_version != PROTOCOL_VERSION {
|
||||||
|
return Err(NetworkingError::MismatchedProtocolVersion {
|
||||||
|
expected: PROTOCOL_VERSION,
|
||||||
|
received: header.protocol_version,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if src.len() < Header::LEN + header.payload_length as usize {
|
||||||
|
// we haven't received the entire expected message yet.
|
||||||
|
// However, reserve enough bytes in the buffer for it
|
||||||
|
src.reserve(Header::LEN + header.payload_length as usize - src.len());
|
||||||
|
|
||||||
|
// We inform the Framed that we need more bytes to form the next frame.
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
let payload = src[Header::LEN..Header::LEN + header.payload_length as usize].to_vec();
|
||||||
|
src.advance(Header::LEN + header.payload_length as usize);
|
||||||
|
|
||||||
|
Ok(Some(OffchainMessage::try_from_bytes(payload)?))
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,22 @@
|
|||||||
|
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
use std::io;
|
||||||
|
use thiserror::Error;
|
||||||
|
|
||||||
|
// this would probably need to adjusted or maybe incorporated into existing stuff as originally
|
||||||
|
// this error was more extensive by being a global `DkgError`
|
||||||
|
#[derive(Error, Debug)]
|
||||||
|
pub enum NetworkingError {
|
||||||
|
#[error("Networking / IO error - {0}")]
|
||||||
|
Io(#[from] io::Error),
|
||||||
|
|
||||||
|
#[error("Received message with specified size bigger than the supported maximum. Received: {received}, supported: {supported}")]
|
||||||
|
MessageTooLarge { supported: u64, received: u64 },
|
||||||
|
|
||||||
|
#[error("Received message with unexpected protocol version. Received: {received}, expected: {expected}")]
|
||||||
|
MismatchedProtocolVersion { expected: u32, received: u32 },
|
||||||
|
|
||||||
|
#[error("Failed to deal with serialization (or deserialization) of the message - {0}")]
|
||||||
|
SerializationError(#[from] bincode::Error),
|
||||||
|
}
|
||||||
@@ -0,0 +1,165 @@
|
|||||||
|
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
use crate::networking::error::NetworkingError;
|
||||||
|
use crate::networking::PROTOCOL_VERSION;
|
||||||
|
use bytes::{BufMut, BytesMut};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::fmt::{Display, Formatter};
|
||||||
|
use std::io;
|
||||||
|
use std::time::Duration;
|
||||||
|
use thiserror::Error;
|
||||||
|
|
||||||
|
// I left a sample `NewDealingMessage` to show how it was originally implemented
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub enum OffchainMessage {
|
||||||
|
// you'd add something like this:
|
||||||
|
// NewDealing {
|
||||||
|
// id: u64,
|
||||||
|
// message: NewDealingMessage,
|
||||||
|
// },
|
||||||
|
ErrorResponse {
|
||||||
|
id: Option<u64>,
|
||||||
|
message: ErrorResponseMessage,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Display for OffchainMessage {
|
||||||
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(f, "OffchainMessage ")?;
|
||||||
|
match self {
|
||||||
|
// OffchainDkgMessage::NewDealing { id, message } => {
|
||||||
|
// write!(f, "with id {} and message: {}", id, message)
|
||||||
|
// }
|
||||||
|
OffchainMessage::ErrorResponse { id, message } => {
|
||||||
|
write!(f, "with id {:?} and message: {}", id, message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// #[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
// pub struct NewDealingMessage {
|
||||||
|
// pub epoch_id: u32,
|
||||||
|
// pub dealing_bytes: Vec<u8>,
|
||||||
|
// pub dealer_signature: identity::Signature,
|
||||||
|
// }
|
||||||
|
// impl Display for NewDealingMessage {
|
||||||
|
// fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
|
// write!(
|
||||||
|
// f,
|
||||||
|
// "NewDealingMessage for epoch {} with length {}",
|
||||||
|
// self.epoch_id,
|
||||||
|
// self.dealing_bytes.len()
|
||||||
|
// )
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, Error)]
|
||||||
|
pub enum ErrorResponseMessage {
|
||||||
|
#[error("{typ} is not a valid request type")]
|
||||||
|
InvalidRequest { typ: String },
|
||||||
|
|
||||||
|
#[error("This request failed to get resolved within {} seconds", .timeout.as_secs())]
|
||||||
|
Timeout { timeout: Duration },
|
||||||
|
}
|
||||||
|
|
||||||
|
impl OffchainMessage {
|
||||||
|
pub(crate) fn new_error_response(id: Option<u64>, message: ErrorResponseMessage) -> Self {
|
||||||
|
OffchainMessage::ErrorResponse { id, message }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn try_from_bytes(bytes: Vec<u8>) -> Result<Self, NetworkingError> {
|
||||||
|
Ok(bincode::deserialize(&bytes)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn try_to_bytes(&self) -> Result<Vec<u8>, NetworkingError> {
|
||||||
|
Ok(bincode::serialize(&self)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn frame(&self) -> Result<FramedOffchainDkgMessage, NetworkingError> {
|
||||||
|
let payload = self.try_to_bytes()?;
|
||||||
|
Ok(FramedOffchainDkgMessage {
|
||||||
|
header: Header {
|
||||||
|
payload_length: payload.len() as u64,
|
||||||
|
protocol_version: PROTOCOL_VERSION,
|
||||||
|
},
|
||||||
|
payload,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn encode(&self, dst: &mut BytesMut) -> Result<(), NetworkingError> {
|
||||||
|
dst.put(self.frame()?.into_bytes().as_ref());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct FramedOffchainDkgMessage {
|
||||||
|
header: Header,
|
||||||
|
payload: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FramedOffchainDkgMessage {
|
||||||
|
fn into_bytes(mut self) -> Vec<u8> {
|
||||||
|
let mut header_bytes = self.header.into_bytes();
|
||||||
|
let mut out = Vec::with_capacity(header_bytes.len() + self.payload.len());
|
||||||
|
|
||||||
|
out.append(&mut header_bytes);
|
||||||
|
out.append(&mut self.payload);
|
||||||
|
out
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Copy, Clone, PartialEq)]
|
||||||
|
pub(crate) struct Header {
|
||||||
|
pub(crate) payload_length: u64,
|
||||||
|
pub(crate) protocol_version: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Header {
|
||||||
|
pub(crate) const LEN: usize = 12;
|
||||||
|
|
||||||
|
pub(crate) fn into_bytes(self) -> Vec<u8> {
|
||||||
|
let mut out = Vec::with_capacity(Self::LEN);
|
||||||
|
out.extend_from_slice(&self.payload_length.to_be_bytes());
|
||||||
|
out.extend_from_slice(&self.protocol_version.to_be_bytes());
|
||||||
|
|
||||||
|
debug_assert_eq!(Self::LEN, out.len());
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn try_from_bytes(bytes: &[u8]) -> Result<Self, NetworkingError> {
|
||||||
|
if bytes.len() != Self::LEN {
|
||||||
|
return Err(NetworkingError::Io(io::Error::new(
|
||||||
|
io::ErrorKind::InvalidData,
|
||||||
|
format!(
|
||||||
|
"OffchainMessageType::Header: got {} bytes, expected: {}",
|
||||||
|
bytes.len(),
|
||||||
|
Self::LEN
|
||||||
|
),
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
Ok(Header {
|
||||||
|
payload_length: u64::from_be_bytes(bytes[..8].try_into().unwrap()),
|
||||||
|
protocol_version: u32::from_be_bytes(bytes[8..].try_into().unwrap()),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::networking::PROTOCOL_VERSION;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn header_deserialization() {
|
||||||
|
let valid_header = Header {
|
||||||
|
payload_length: 1234,
|
||||||
|
protocol_version: PROTOCOL_VERSION,
|
||||||
|
};
|
||||||
|
|
||||||
|
let bytes = valid_header.into_bytes();
|
||||||
|
assert_eq!(valid_header, Header::try_from_bytes(&bytes).unwrap())
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,14 @@
|
|||||||
|
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
// TODO: if this becomes too cumbersome, perhaps consider a more streamlined solution like tarpc
|
||||||
|
// (I wouldn't have needed that for DKG, but if this is to be used for different purposes, maybe
|
||||||
|
// it would have been more appropriate)
|
||||||
|
|
||||||
|
pub(crate) mod codec;
|
||||||
|
pub(crate) mod error;
|
||||||
|
pub(crate) mod message;
|
||||||
|
pub(crate) mod receiver;
|
||||||
|
pub(crate) mod sender;
|
||||||
|
|
||||||
|
pub(crate) const PROTOCOL_VERSION: u32 = 1;
|
||||||
@@ -0,0 +1,98 @@
|
|||||||
|
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
use crate::networking::codec::OffchainCodec;
|
||||||
|
use crate::networking::message::{ErrorResponseMessage, OffchainMessage};
|
||||||
|
// use crate::dkg::state::StateAccessor;
|
||||||
|
use futures::{SinkExt, StreamExt};
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::net::TcpStream;
|
||||||
|
use tokio::time::timeout;
|
||||||
|
use tokio_util::codec::Framed;
|
||||||
|
|
||||||
|
const DEFAULT_MAX_CONNECTION_DURATION: Duration = Duration::from_secs(2 * 60 * 60);
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ConnectionHandler {
|
||||||
|
// connection cannot exist for more than this time
|
||||||
|
max_connection_duration: Duration,
|
||||||
|
// state_accessor: StateAccessor,
|
||||||
|
conn: Framed<TcpStream, OffchainCodec>,
|
||||||
|
remote: SocketAddr,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConnectionHandler {
|
||||||
|
pub(crate) fn new(conn: TcpStream, remote: SocketAddr) -> Self {
|
||||||
|
ConnectionHandler {
|
||||||
|
max_connection_duration: DEFAULT_MAX_CONNECTION_DURATION,
|
||||||
|
remote,
|
||||||
|
conn: Framed::new(conn, OffchainCodec),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_response(&mut self, response_message: OffchainMessage) {
|
||||||
|
if let Err(err) = self.conn.send(&response_message).await {
|
||||||
|
warn!("Failed to send response back to {} - {}", self.remote, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_error_response(&mut self, id: Option<u64>, error: ErrorResponseMessage) {
|
||||||
|
self.send_response(OffchainMessage::new_error_response(id, error))
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_request(&mut self, request: OffchainMessage) {
|
||||||
|
match request {
|
||||||
|
OffchainMessage::ErrorResponse { id, .. } => {
|
||||||
|
self.send_error_response(
|
||||||
|
id,
|
||||||
|
ErrorResponseMessage::InvalidRequest {
|
||||||
|
typ: "ErrorResponse".into(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn _handle_connection(&mut self) {
|
||||||
|
debug!("Starting connection handler for {}", self.remote);
|
||||||
|
|
||||||
|
while let Some(framed_dkg_request) = self.conn.next().await {
|
||||||
|
trace!("received new message from {}", self.remote);
|
||||||
|
match framed_dkg_request {
|
||||||
|
Ok(framed_dkg_request) => self.handle_request(framed_dkg_request).await,
|
||||||
|
Err(err) => {
|
||||||
|
warn!(
|
||||||
|
"The socket connection got corrupted with error: {:?}. Closing the socket",
|
||||||
|
err
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!("Closing connection from {}", self.remote);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn handle_connection(mut self) {
|
||||||
|
let remote = self.remote;
|
||||||
|
if timeout(self.max_connection_duration, self._handle_connection())
|
||||||
|
.await
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
warn!(
|
||||||
|
"we timed out while trying to resolve connection from {}",
|
||||||
|
remote
|
||||||
|
);
|
||||||
|
self.send_error_response(
|
||||||
|
None,
|
||||||
|
ErrorResponseMessage::Timeout {
|
||||||
|
timeout: self.max_connection_duration,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,47 @@
|
|||||||
|
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
use crate::networking::receiver::handler::ConnectionHandler;
|
||||||
|
use log::debug;
|
||||||
|
use std::fmt::Display;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::process;
|
||||||
|
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
|
||||||
|
|
||||||
|
// note that we do not expect persistent connections between dealers, they should only really
|
||||||
|
// exist for the duration of a single message exchange
|
||||||
|
pub struct Listener<A> {
|
||||||
|
address: A,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<A> Listener<A> {
|
||||||
|
pub(crate) fn new(address: A) -> Self {
|
||||||
|
Listener { address }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_connect(&self, conn: TcpStream, remote: SocketAddr) {
|
||||||
|
tokio::spawn(ConnectionHandler::new(conn, remote).handle_connection());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn run(&mut self)
|
||||||
|
where
|
||||||
|
A: ToSocketAddrs + Display,
|
||||||
|
{
|
||||||
|
debug!("starting off-chain DKG Listener");
|
||||||
|
|
||||||
|
let listener = match TcpListener::bind(&self.address).await {
|
||||||
|
Ok(listener) => listener,
|
||||||
|
Err(err) => {
|
||||||
|
error!("Failed to bind to {} - {}.", self.address, err);
|
||||||
|
process::exit(1);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match listener.accept().await {
|
||||||
|
Ok((socket, remote_addr)) => self.on_connect(socket, remote_addr),
|
||||||
|
Err(err) => warn!("Failed to accept incoming connection - {:?}", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,7 @@
|
|||||||
|
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
mod handler;
|
||||||
|
mod listener;
|
||||||
|
|
||||||
|
pub(crate) use listener::Listener;
|
||||||
@@ -0,0 +1,122 @@
|
|||||||
|
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
use crate::networking::message::OffchainMessage;
|
||||||
|
use crate::networking::sender::{send_single_message, ConnectionConfig, SendResponse};
|
||||||
|
use futures::channel::mpsc;
|
||||||
|
use futures::{stream, StreamExt};
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
|
// TODO: for now just leave it here and make it configurable with proper config later
|
||||||
|
const DEFAULT_CONCURRENCY: usize = 5;
|
||||||
|
|
||||||
|
type FeedbackSender = mpsc::UnboundedSender<SendResponse>;
|
||||||
|
|
||||||
|
pub(crate) struct Broadcaster {
|
||||||
|
addresses: Vec<SocketAddr>,
|
||||||
|
concurrency_level: usize,
|
||||||
|
config: ConnectionConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Broadcaster {
|
||||||
|
pub(crate) fn new(addresses: Vec<SocketAddr>, config: ConnectionConfig) -> Self {
|
||||||
|
Broadcaster {
|
||||||
|
addresses,
|
||||||
|
concurrency_level: DEFAULT_CONCURRENCY,
|
||||||
|
config,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn with_concurrency_level(mut self, concurrency_level: usize) -> Self {
|
||||||
|
self.concurrency_level = concurrency_level;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn set_addresses(&mut self, new_addresses: Vec<SocketAddr>) {
|
||||||
|
self.addresses = new_addresses;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_broadcast_configs(
|
||||||
|
&self,
|
||||||
|
message: OffchainMessage,
|
||||||
|
feedback_sender: Option<FeedbackSender>,
|
||||||
|
) -> Vec<BroadcastConfig> {
|
||||||
|
self.addresses
|
||||||
|
.iter()
|
||||||
|
.map(|&address| BroadcastConfig {
|
||||||
|
address,
|
||||||
|
config: self.config,
|
||||||
|
feedback_sender: feedback_sender.clone(),
|
||||||
|
message: message.clone(),
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn broadcast_with_feedback(&self, msg: OffchainMessage) -> Vec<SendResponse> {
|
||||||
|
if self.addresses.is_empty() {
|
||||||
|
warn!("attempting to broadcast {} while no remotes are known", msg);
|
||||||
|
return Vec::new();
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!("broadcasting {} to {} remotes", msg, self.addresses.len());
|
||||||
|
let (feedback_tx, mut feedback_rx) = mpsc::unbounded();
|
||||||
|
|
||||||
|
stream::iter(self.create_broadcast_configs(msg, Some(feedback_tx)))
|
||||||
|
.for_each_concurrent(self.concurrency_level, |cfg| cfg.send())
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let mut responses = Vec::new();
|
||||||
|
|
||||||
|
for _ in 0..self.addresses.len() {
|
||||||
|
// we should have received exactly self.addresses number of responses
|
||||||
|
// (they could be just Err failure responses, but should exist nonetheless)
|
||||||
|
match feedback_rx.try_next() {
|
||||||
|
Ok(Some(response)) => responses.push(response),
|
||||||
|
Err(_) | Ok(None) => {
|
||||||
|
error!("somehow we received fewer feedback responses than sent messages")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// the channel should have been drained and all sender should have been dropped
|
||||||
|
debug_assert!(matches!(feedback_rx.try_next(), Ok(None)));
|
||||||
|
responses
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn broadcast(&self, msg: OffchainMessage) {
|
||||||
|
if self.addresses.is_empty() {
|
||||||
|
warn!("attempting to broadcast {} while no remotes are known", msg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!("broadcasting {} to {} remotes", msg, self.addresses.len());
|
||||||
|
stream::iter(self.create_broadcast_configs(msg, None))
|
||||||
|
.for_each_concurrent(self.concurrency_level, |cfg| cfg.send())
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// internal struct to have per-connection config on hand
|
||||||
|
struct BroadcastConfig {
|
||||||
|
address: SocketAddr,
|
||||||
|
config: ConnectionConfig,
|
||||||
|
feedback_sender: Option<FeedbackSender>,
|
||||||
|
message: OffchainMessage,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BroadcastConfig {
|
||||||
|
async fn send(self) {
|
||||||
|
let response = send_single_message(self.address, self.config, &self.message).await;
|
||||||
|
if let Some(feedback_sender) = self.feedback_sender {
|
||||||
|
// this can only fail if the receiver is disconnected which should never be the case
|
||||||
|
// thus we can ignore the possible error
|
||||||
|
let _ = feedback_sender.unbounded_send(response);
|
||||||
|
} else if let Err(err) = response.response {
|
||||||
|
// if we're not forwarding feedback, at least emit a warning about the failure
|
||||||
|
warn!(
|
||||||
|
"failed to broadcast {} to {} - {}",
|
||||||
|
self.message, self.address, err
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,83 @@
|
|||||||
|
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
use crate::networking::codec::OffchainCodec;
|
||||||
|
use crate::networking::error::NetworkingError;
|
||||||
|
use crate::networking::message::OffchainMessage;
|
||||||
|
use crate::networking::sender::ConnectionConfig;
|
||||||
|
use futures::{SinkExt, StreamExt};
|
||||||
|
use std::io;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::net::TcpStream;
|
||||||
|
use tokio::time::timeout;
|
||||||
|
use tokio_util::codec::Framed;
|
||||||
|
|
||||||
|
// this connection only exists for a single message
|
||||||
|
pub(crate) struct EphemeralConnection {
|
||||||
|
remote: SocketAddr,
|
||||||
|
conn: Framed<TcpStream, OffchainCodec>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EphemeralConnection {
|
||||||
|
pub(crate) async fn connect(
|
||||||
|
address: SocketAddr,
|
||||||
|
connection_timeout: Duration,
|
||||||
|
) -> io::Result<Self> {
|
||||||
|
trace!("attempting to connect to {}", address);
|
||||||
|
let conn = match timeout(connection_timeout, TcpStream::connect(address)).await {
|
||||||
|
Err(_timeout) => {
|
||||||
|
return Err(io::Error::new(
|
||||||
|
io::ErrorKind::Other,
|
||||||
|
format!("timed out while attempting to send message to {}", address),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
Ok(conn_res) => conn_res?,
|
||||||
|
};
|
||||||
|
let framed_conn = Framed::new(conn, OffchainCodec);
|
||||||
|
Ok(Self {
|
||||||
|
remote: address,
|
||||||
|
conn: framed_conn,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn remote(&self) -> SocketAddr {
|
||||||
|
self.remote
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send(
|
||||||
|
&mut self,
|
||||||
|
message: &OffchainMessage,
|
||||||
|
send_timeout: Duration,
|
||||||
|
response_timeout: Option<Duration>,
|
||||||
|
) -> Result<Option<OffchainMessage>, NetworkingError> {
|
||||||
|
trace!("attempting to send to {}", self.remote);
|
||||||
|
match timeout(send_timeout, self.conn.send(message)).await {
|
||||||
|
Err(_timeout) => {
|
||||||
|
return Err(NetworkingError::Io(io::Error::new(
|
||||||
|
io::ErrorKind::Other,
|
||||||
|
"timed out while attempting to send message",
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
Ok(res) => res?,
|
||||||
|
}
|
||||||
|
if let Some(response_timeout) = response_timeout {
|
||||||
|
match timeout(response_timeout, self.conn.next()).await {
|
||||||
|
Err(_elapsed) => Ok(None),
|
||||||
|
Ok(response) => response.transpose(),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn connect_and_send(
|
||||||
|
address: SocketAddr,
|
||||||
|
cfg: ConnectionConfig,
|
||||||
|
message: &OffchainMessage,
|
||||||
|
) -> Result<Option<OffchainMessage>, NetworkingError> {
|
||||||
|
let mut conn = EphemeralConnection::connect(address, cfg.connection_timeout).await?;
|
||||||
|
conn.send(message, cfg.send_timeout, cfg.response_timeout)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,65 @@
|
|||||||
|
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
use crate::networking::error::NetworkingError;
|
||||||
|
use crate::networking::message::OffchainMessage;
|
||||||
|
use crate::networking::sender::broadcast::Broadcaster;
|
||||||
|
use crate::networking::sender::ephemeral::EphemeralConnection;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
pub(crate) mod broadcast;
|
||||||
|
pub(crate) mod ephemeral;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
pub(crate) struct ConnectionConfig {
|
||||||
|
pub(crate) connection_timeout: Duration,
|
||||||
|
pub(crate) response_timeout: Option<Duration>,
|
||||||
|
pub(crate) send_timeout: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) struct SendResponse {
|
||||||
|
pub(crate) source: SocketAddr,
|
||||||
|
pub(crate) response: Result<Option<OffchainMessage>, NetworkingError>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SendResponse {
|
||||||
|
pub(crate) fn new(
|
||||||
|
source: SocketAddr,
|
||||||
|
response: Result<Option<OffchainMessage>, NetworkingError>,
|
||||||
|
) -> Self {
|
||||||
|
SendResponse { source, response }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn send_single_message(
|
||||||
|
address: SocketAddr,
|
||||||
|
cfg: ConnectionConfig,
|
||||||
|
message: &OffchainMessage,
|
||||||
|
) -> SendResponse {
|
||||||
|
let res = EphemeralConnection::connect_and_send(address, cfg, message).await;
|
||||||
|
SendResponse::new(address, res)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn broadcast_message(
|
||||||
|
addresses: Vec<SocketAddr>,
|
||||||
|
cfg: ConnectionConfig,
|
||||||
|
message: &OffchainMessage,
|
||||||
|
) {
|
||||||
|
Broadcaster::new(addresses, cfg)
|
||||||
|
.broadcast(message.clone())
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn broadcast_message_with_feedback(
|
||||||
|
addresses: Vec<SocketAddr>,
|
||||||
|
cfg: ConnectionConfig,
|
||||||
|
message: &OffchainMessage,
|
||||||
|
) -> Vec<SendResponse> {
|
||||||
|
Broadcaster::new(addresses, cfg)
|
||||||
|
.broadcast_with_feedback(message.clone())
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOTE: for the original purposes of DKG stateless broadcasts and one-off sends were enough,
|
||||||
|
// so I never implemented proper persistent connections
|
||||||
Reference in New Issue
Block a user