Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d6a81d9213 | |||
| a6db5fe704 | |||
| fe57d08f3e | |||
| ae29b2300c | |||
| 7b98d62f96 | |||
| 6abe95ed61 |
Generated
+6
-2
@@ -1063,6 +1063,8 @@ dependencies = [
|
||||
"pemstore",
|
||||
"rand 0.7.3",
|
||||
"rand_chacha 0.2.2",
|
||||
"serde",
|
||||
"serde_bytes",
|
||||
"subtle-encoding",
|
||||
"x25519-dalek",
|
||||
]
|
||||
@@ -1427,6 +1429,7 @@ version = "1.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3d5c4b5e5959dc2c2b89918d8e2cc40fcdd623cef026ed09d2f0ee05199dc8e4"
|
||||
dependencies = [
|
||||
"serde",
|
||||
"signature",
|
||||
]
|
||||
|
||||
@@ -1440,6 +1443,7 @@ dependencies = [
|
||||
"ed25519",
|
||||
"rand 0.7.3",
|
||||
"serde",
|
||||
"serde_bytes",
|
||||
"sha2",
|
||||
"zeroize",
|
||||
]
|
||||
@@ -4834,9 +4838,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "serde_bytes"
|
||||
version = "0.11.5"
|
||||
version = "0.11.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "16ae07dd2f88a366f15bd0632ba725227018c69a1c8550a927324f8eb8368bb9"
|
||||
checksum = "212e73464ebcde48d723aa02eb270ba62eff38a9b732df31f33f1b4e145f3a54"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
@@ -324,6 +324,7 @@ pub trait SigningCosmWasmClient: CosmWasmClient {
|
||||
|
||||
Ok(ExecuteResult {
|
||||
logs: parse_raw_logs(tx_res.tx_result.log)?,
|
||||
data: tx_res.tx_result.data,
|
||||
transaction_hash: tx_res.hash,
|
||||
gas_info,
|
||||
})
|
||||
@@ -364,6 +365,7 @@ pub trait SigningCosmWasmClient: CosmWasmClient {
|
||||
|
||||
Ok(ExecuteResult {
|
||||
logs: parse_raw_logs(tx_res.tx_result.log)?,
|
||||
data: tx_res.tx_result.data,
|
||||
transaction_hash: tx_res.hash,
|
||||
gas_info,
|
||||
})
|
||||
|
||||
@@ -25,6 +25,7 @@ use cosmrs::proto::cosmwasm::wasm::v1::{
|
||||
CodeInfoResponse, ContractCodeHistoryEntry as ProtoContractCodeHistoryEntry,
|
||||
ContractCodeHistoryOperationType, ContractInfo as ProtoContractInfo,
|
||||
};
|
||||
use cosmrs::tendermint::abci::Data;
|
||||
use cosmrs::tendermint::{abci, chain};
|
||||
use cosmrs::tx::{AccountNumber, Gas, SequenceNumber};
|
||||
use cosmrs::{tx, AccountId, Any, Coin};
|
||||
@@ -672,6 +673,8 @@ pub struct MigrateResult {
|
||||
pub struct ExecuteResult {
|
||||
pub logs: Vec<Log>,
|
||||
|
||||
pub data: Data,
|
||||
|
||||
/// Transaction hash (might be used as transaction ID)
|
||||
pub transaction_hash: tx::Hash,
|
||||
|
||||
|
||||
@@ -19,6 +19,8 @@ cipher = { version = "0.4.3", optional = true }
|
||||
x25519-dalek = { version = "1.1", optional = true }
|
||||
ed25519-dalek = { version = "1.0", optional = true }
|
||||
rand = { version = "0.7.3", features = ["wasm-bindgen"], optional = true }
|
||||
serde_bytes = { version = "0.11.6", optional = true }
|
||||
serde_crate = { version = "1.0", optional = true, default_features = false, package = "serde" }
|
||||
subtle-encoding = { version = "0.5", features = ["bech32-preview"]}
|
||||
|
||||
# internal
|
||||
@@ -30,6 +32,7 @@ config = { path="../../common/config" }
|
||||
rand_chacha = "0.2"
|
||||
|
||||
[features]
|
||||
serde = ["serde_crate", "serde_bytes", "ed25519-dalek/serde", "x25519-dalek/serde"]
|
||||
asymmetric = ["x25519-dalek", "ed25519-dalek"]
|
||||
hashing = ["blake3", "digest", "hkdf", "hmac", "generic-array"]
|
||||
symmetric = ["aes", "ctr", "cipher", "generic-array"]
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
use pemstore::traits::{PemStorableKey, PemStorableKeyPair};
|
||||
#[cfg(feature = "rand")]
|
||||
use rand::{CryptoRng, RngCore};
|
||||
#[cfg(feature = "serde")]
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
use std::fmt::{self, Display, Formatter};
|
||||
|
||||
/// Size of a X25519 private key
|
||||
@@ -127,6 +129,28 @@ impl PublicKey {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "serde")]
|
||||
impl Serialize for PublicKey {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
self.0.serialize(serializer)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "serde")]
|
||||
impl<'d> Deserialize<'d> for PublicKey {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: Deserializer<'d>,
|
||||
{
|
||||
Ok(PublicKey(x25519_dalek::PublicKey::deserialize(
|
||||
deserializer,
|
||||
)?))
|
||||
}
|
||||
}
|
||||
|
||||
impl PemStorableKey for PublicKey {
|
||||
type Error = KeyRecoveryError;
|
||||
|
||||
@@ -143,7 +167,6 @@ impl PemStorableKey for PublicKey {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct PrivateKey(x25519_dalek::StaticSecret);
|
||||
|
||||
impl Display for PrivateKey {
|
||||
@@ -187,6 +210,28 @@ impl PrivateKey {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "serde")]
|
||||
impl Serialize for PrivateKey {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
self.0.serialize(serializer)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "serde")]
|
||||
impl<'d> Deserialize<'d> for PrivateKey {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: Deserializer<'d>,
|
||||
{
|
||||
Ok(PrivateKey(x25519_dalek::StaticSecret::deserialize(
|
||||
deserializer,
|
||||
)?))
|
||||
}
|
||||
}
|
||||
|
||||
impl PemStorableKey for PrivateKey {
|
||||
type Error = KeyRecoveryError;
|
||||
|
||||
|
||||
@@ -10,6 +10,13 @@ use pemstore::traits::{PemStorableKey, PemStorableKeyPair};
|
||||
use rand::{CryptoRng, RngCore};
|
||||
use std::fmt::{self, Display, Formatter};
|
||||
|
||||
#[cfg(feature = "serde")]
|
||||
use serde::de::Error as SerdeError;
|
||||
#[cfg(feature = "serde")]
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
#[cfg(feature = "serde")]
|
||||
use serde_bytes::{ByteBuf as SerdeByteBuf, Bytes as SerdeBytes};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Ed25519RecoveryError {
|
||||
MalformedBytes(SignatureError),
|
||||
@@ -40,6 +47,7 @@ impl fmt::Display for Ed25519RecoveryError {
|
||||
impl std::error::Error for Ed25519RecoveryError {}
|
||||
|
||||
/// Keypair for usage in ed25519 EdDSA.
|
||||
#[derive(Debug)]
|
||||
pub struct KeyPair {
|
||||
private_key: PrivateKey,
|
||||
public_key: PublicKey,
|
||||
@@ -135,6 +143,28 @@ impl PublicKey {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "serde")]
|
||||
impl Serialize for PublicKey {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
self.0.serialize(serializer)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "serde")]
|
||||
impl<'d> Deserialize<'d> for PublicKey {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: Deserializer<'d>,
|
||||
{
|
||||
Ok(PublicKey(ed25519_dalek::PublicKey::deserialize(
|
||||
deserializer,
|
||||
)?))
|
||||
}
|
||||
}
|
||||
|
||||
impl PemStorableKey for PublicKey {
|
||||
type Error = Ed25519RecoveryError;
|
||||
|
||||
@@ -200,6 +230,28 @@ impl PrivateKey {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "serde")]
|
||||
impl Serialize for PrivateKey {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
self.0.serialize(serializer)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "serde")]
|
||||
impl<'d> Deserialize<'d> for PrivateKey {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: Deserializer<'d>,
|
||||
{
|
||||
Ok(PrivateKey(ed25519_dalek::SecretKey::deserialize(
|
||||
deserializer,
|
||||
)?))
|
||||
}
|
||||
}
|
||||
|
||||
impl PemStorableKey for PrivateKey {
|
||||
type Error = Ed25519RecoveryError;
|
||||
|
||||
@@ -216,7 +268,7 @@ impl PemStorableKey for PrivateKey {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct Signature(ed25519_dalek::Signature);
|
||||
|
||||
impl Signature {
|
||||
@@ -237,3 +289,24 @@ impl Signature {
|
||||
Ok(Signature(ed25519_dalek::Signature::from_bytes(bytes)?))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "serde")]
|
||||
impl Serialize for Signature {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
SerdeBytes::new(&self.to_bytes()).serialize(serializer)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "serde")]
|
||||
impl<'d> Deserialize<'d> for Signature {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: Deserializer<'d>,
|
||||
{
|
||||
let bytes = <SerdeByteBuf>::deserialize(deserializer)?;
|
||||
Signature::from_bytes(bytes.as_ref()).map_err(SerdeError::custom)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,5 +29,5 @@ pub use blake3;
|
||||
#[cfg(feature = "symmetric")]
|
||||
pub use ctr;
|
||||
|
||||
// TODO: this function uses all three modules: asymmetric crypto, symmetric crypto and derives key...,
|
||||
// so I don't know where to put it...
|
||||
#[cfg(feature = "serde")]
|
||||
extern crate serde_crate as serde;
|
||||
|
||||
@@ -18,6 +18,7 @@ cfg_if::cfg_if! {
|
||||
if #[cfg(network = "mainnet")] {
|
||||
pub const DEFAULT_NETWORK: all::Network = all::Network::MAINNET;
|
||||
pub const DENOM: &str = mainnet::DENOM;
|
||||
pub const STAKE_DENOM: &str = mainnet::STAKE_DENOM;
|
||||
|
||||
pub const ETH_CONTRACT_ADDRESS: [u8; 20] = mainnet::_ETH_CONTRACT_ADDRESS;
|
||||
pub const ETH_ERC20_CONTRACT_ADDRESS: [u8; 20] = mainnet::_ETH_ERC20_CONTRACT_ADDRESS;
|
||||
@@ -25,6 +26,7 @@ cfg_if::cfg_if! {
|
||||
} else if #[cfg(network = "qa")] {
|
||||
pub const DEFAULT_NETWORK: all::Network = all::Network::QA;
|
||||
pub const DENOM: &str = qa::DENOM;
|
||||
pub const STAKE_DENOM: &str = qa::STAKE_DENOM;
|
||||
|
||||
pub const ETH_CONTRACT_ADDRESS: [u8; 20] = qa::_ETH_CONTRACT_ADDRESS;
|
||||
pub const ETH_ERC20_CONTRACT_ADDRESS: [u8; 20] = qa::_ETH_ERC20_CONTRACT_ADDRESS;
|
||||
@@ -32,6 +34,7 @@ cfg_if::cfg_if! {
|
||||
} else if #[cfg(network = "sandbox")] {
|
||||
pub const DEFAULT_NETWORK: all::Network = all::Network::SANDBOX;
|
||||
pub const DENOM: &str = sandbox::DENOM;
|
||||
pub const STAKE_DENOM: &str = sandbox::STAKE_DENOM;
|
||||
|
||||
pub const ETH_CONTRACT_ADDRESS: [u8; 20] = sandbox::_ETH_CONTRACT_ADDRESS;
|
||||
pub const ETH_ERC20_CONTRACT_ADDRESS: [u8; 20] = sandbox::_ETH_ERC20_CONTRACT_ADDRESS;
|
||||
|
||||
@@ -5,6 +5,7 @@ use crate::ValidatorDetails;
|
||||
|
||||
pub(crate) const BECH32_PREFIX: &str = "n";
|
||||
pub const DENOM: &str = "unym";
|
||||
pub const STAKE_DENOM: &str = "unyx";
|
||||
|
||||
pub(crate) const MIXNET_CONTRACT_ADDRESS: &str =
|
||||
"n14hj2tavq8fpesdwxxcu44rty3hh90vhujrvcmstl4zr3txmfvw9sjyvg3g";
|
||||
|
||||
@@ -5,6 +5,7 @@ use crate::ValidatorDetails;
|
||||
|
||||
pub(crate) const BECH32_PREFIX: &str = "n";
|
||||
pub const DENOM: &str = "unym";
|
||||
pub const STAKE_DENOM: &str = "unyx";
|
||||
|
||||
pub(crate) const MIXNET_CONTRACT_ADDRESS: &str =
|
||||
"n1suhgf5svhu4usrurvxzlgn54ksxmn8gljarjtxqnapv8kjnp4nrsd3qaep";
|
||||
|
||||
@@ -5,6 +5,7 @@ use crate::ValidatorDetails;
|
||||
|
||||
pub(crate) const BECH32_PREFIX: &str = "nymt";
|
||||
pub const DENOM: &str = "unymt";
|
||||
pub const STAKE_DENOM: &str = "unyxt";
|
||||
|
||||
pub(crate) const MIXNET_CONTRACT_ADDRESS: &str = "nymt1ghd753shjuwexxywmgs4xz7x2q732vcnstz02j";
|
||||
pub(crate) const VESTING_CONTRACT_ADDRESS: &str = "nymt14ejqjyq8um4p3xfqj74yld5waqljf88fn549lh";
|
||||
|
||||
@@ -45,6 +45,9 @@ mod rewarded_set_updater;
|
||||
pub(crate) mod storage;
|
||||
mod swagger;
|
||||
|
||||
#[allow(dead_code)]
|
||||
mod networking;
|
||||
|
||||
#[cfg(feature = "coconut")]
|
||||
mod coconut;
|
||||
|
||||
@@ -590,6 +593,8 @@ async fn run_validator_api(matches: ArgMatches<'static>) -> Result<()> {
|
||||
async fn main() -> Result<()> {
|
||||
println!("Starting validator api...");
|
||||
|
||||
dotenv::dotenv()?;
|
||||
|
||||
cfg_if::cfg_if! {if #[cfg(feature = "console-subscriber")] {
|
||||
// instriment tokio console subscriber needs RUSTFLAGS="--cfg tokio_unstable" at build time
|
||||
console_subscriber::init();
|
||||
|
||||
@@ -0,0 +1,65 @@
|
||||
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::networking::error::NetworkingError;
|
||||
use crate::networking::message::{Header, OffchainMessage};
|
||||
use crate::networking::PROTOCOL_VERSION;
|
||||
use bytes::{Buf, BytesMut};
|
||||
use tokio_util::codec::{Decoder, Encoder};
|
||||
|
||||
// TODO: that was fine for the purposes of DKG, we might want to increase it if it's used anywhere else
|
||||
const MAX_ALLOWED_MESSAGE_LEN: u64 = 2 * 1024 * 1024;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct OffchainCodec;
|
||||
|
||||
impl<'a> Encoder<&'a OffchainMessage> for OffchainCodec {
|
||||
type Error = NetworkingError;
|
||||
|
||||
fn encode(&mut self, item: &OffchainMessage, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
||||
item.encode(dst)
|
||||
}
|
||||
}
|
||||
|
||||
impl Decoder for OffchainCodec {
|
||||
type Item = OffchainMessage;
|
||||
type Error = NetworkingError;
|
||||
|
||||
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||
if src.len() < Header::LEN {
|
||||
// can't do much without being able to deserialize the header
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// header deserialization is simple enough to not be too cumbersome if it were to be repeated
|
||||
let header = Header::try_from_bytes(&src[..Header::LEN])?;
|
||||
|
||||
if header.payload_length > MAX_ALLOWED_MESSAGE_LEN {
|
||||
return Err(NetworkingError::MessageTooLarge {
|
||||
supported: MAX_ALLOWED_MESSAGE_LEN,
|
||||
received: header.payload_length,
|
||||
});
|
||||
}
|
||||
|
||||
if header.protocol_version != PROTOCOL_VERSION {
|
||||
return Err(NetworkingError::MismatchedProtocolVersion {
|
||||
expected: PROTOCOL_VERSION,
|
||||
received: header.protocol_version,
|
||||
});
|
||||
}
|
||||
|
||||
if src.len() < Header::LEN + header.payload_length as usize {
|
||||
// we haven't received the entire expected message yet.
|
||||
// However, reserve enough bytes in the buffer for it
|
||||
src.reserve(Header::LEN + header.payload_length as usize - src.len());
|
||||
|
||||
// We inform the Framed that we need more bytes to form the next frame.
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let payload = src[Header::LEN..Header::LEN + header.payload_length as usize].to_vec();
|
||||
src.advance(Header::LEN + header.payload_length as usize);
|
||||
|
||||
Ok(Some(OffchainMessage::try_from_bytes(payload)?))
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use std::io;
|
||||
use thiserror::Error;
|
||||
|
||||
// this would probably need to adjusted or maybe incorporated into existing stuff as originally
|
||||
// this error was more extensive by being a global `DkgError`
|
||||
#[derive(Error, Debug)]
|
||||
pub enum NetworkingError {
|
||||
#[error("Networking / IO error - {0}")]
|
||||
Io(#[from] io::Error),
|
||||
|
||||
#[error("Received message with specified size bigger than the supported maximum. Received: {received}, supported: {supported}")]
|
||||
MessageTooLarge { supported: u64, received: u64 },
|
||||
|
||||
#[error("Received message with unexpected protocol version. Received: {received}, expected: {expected}")]
|
||||
MismatchedProtocolVersion { expected: u32, received: u32 },
|
||||
|
||||
#[error("Failed to deal with serialization (or deserialization) of the message - {0}")]
|
||||
SerializationError(#[from] bincode::Error),
|
||||
}
|
||||
@@ -0,0 +1,165 @@
|
||||
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::networking::error::NetworkingError;
|
||||
use crate::networking::PROTOCOL_VERSION;
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::io;
|
||||
use std::time::Duration;
|
||||
use thiserror::Error;
|
||||
|
||||
// I left a sample `NewDealingMessage` to show how it was originally implemented
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum OffchainMessage {
|
||||
// you'd add something like this:
|
||||
// NewDealing {
|
||||
// id: u64,
|
||||
// message: NewDealingMessage,
|
||||
// },
|
||||
ErrorResponse {
|
||||
id: Option<u64>,
|
||||
message: ErrorResponseMessage,
|
||||
},
|
||||
}
|
||||
|
||||
impl Display for OffchainMessage {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "OffchainMessage ")?;
|
||||
match self {
|
||||
// OffchainDkgMessage::NewDealing { id, message } => {
|
||||
// write!(f, "with id {} and message: {}", id, message)
|
||||
// }
|
||||
OffchainMessage::ErrorResponse { id, message } => {
|
||||
write!(f, "with id {:?} and message: {}", id, message)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// #[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
// pub struct NewDealingMessage {
|
||||
// pub epoch_id: u32,
|
||||
// pub dealing_bytes: Vec<u8>,
|
||||
// pub dealer_signature: identity::Signature,
|
||||
// }
|
||||
// impl Display for NewDealingMessage {
|
||||
// fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
// write!(
|
||||
// f,
|
||||
// "NewDealingMessage for epoch {} with length {}",
|
||||
// self.epoch_id,
|
||||
// self.dealing_bytes.len()
|
||||
// )
|
||||
// }
|
||||
// }
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Error)]
|
||||
pub enum ErrorResponseMessage {
|
||||
#[error("{typ} is not a valid request type")]
|
||||
InvalidRequest { typ: String },
|
||||
|
||||
#[error("This request failed to get resolved within {} seconds", .timeout.as_secs())]
|
||||
Timeout { timeout: Duration },
|
||||
}
|
||||
|
||||
impl OffchainMessage {
|
||||
pub(crate) fn new_error_response(id: Option<u64>, message: ErrorResponseMessage) -> Self {
|
||||
OffchainMessage::ErrorResponse { id, message }
|
||||
}
|
||||
|
||||
pub(crate) fn try_from_bytes(bytes: Vec<u8>) -> Result<Self, NetworkingError> {
|
||||
Ok(bincode::deserialize(&bytes)?)
|
||||
}
|
||||
|
||||
pub(crate) fn try_to_bytes(&self) -> Result<Vec<u8>, NetworkingError> {
|
||||
Ok(bincode::serialize(&self)?)
|
||||
}
|
||||
|
||||
fn frame(&self) -> Result<FramedOffchainDkgMessage, NetworkingError> {
|
||||
let payload = self.try_to_bytes()?;
|
||||
Ok(FramedOffchainDkgMessage {
|
||||
header: Header {
|
||||
payload_length: payload.len() as u64,
|
||||
protocol_version: PROTOCOL_VERSION,
|
||||
},
|
||||
payload,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn encode(&self, dst: &mut BytesMut) -> Result<(), NetworkingError> {
|
||||
dst.put(self.frame()?.into_bytes().as_ref());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct FramedOffchainDkgMessage {
|
||||
header: Header,
|
||||
payload: Vec<u8>,
|
||||
}
|
||||
|
||||
impl FramedOffchainDkgMessage {
|
||||
fn into_bytes(mut self) -> Vec<u8> {
|
||||
let mut header_bytes = self.header.into_bytes();
|
||||
let mut out = Vec::with_capacity(header_bytes.len() + self.payload.len());
|
||||
|
||||
out.append(&mut header_bytes);
|
||||
out.append(&mut self.payload);
|
||||
out
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq)]
|
||||
pub(crate) struct Header {
|
||||
pub(crate) payload_length: u64,
|
||||
pub(crate) protocol_version: u32,
|
||||
}
|
||||
|
||||
impl Header {
|
||||
pub(crate) const LEN: usize = 12;
|
||||
|
||||
pub(crate) fn into_bytes(self) -> Vec<u8> {
|
||||
let mut out = Vec::with_capacity(Self::LEN);
|
||||
out.extend_from_slice(&self.payload_length.to_be_bytes());
|
||||
out.extend_from_slice(&self.protocol_version.to_be_bytes());
|
||||
|
||||
debug_assert_eq!(Self::LEN, out.len());
|
||||
out
|
||||
}
|
||||
|
||||
pub(crate) fn try_from_bytes(bytes: &[u8]) -> Result<Self, NetworkingError> {
|
||||
if bytes.len() != Self::LEN {
|
||||
return Err(NetworkingError::Io(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
format!(
|
||||
"OffchainMessageType::Header: got {} bytes, expected: {}",
|
||||
bytes.len(),
|
||||
Self::LEN
|
||||
),
|
||||
)));
|
||||
}
|
||||
Ok(Header {
|
||||
payload_length: u64::from_be_bytes(bytes[..8].try_into().unwrap()),
|
||||
protocol_version: u32::from_be_bytes(bytes[8..].try_into().unwrap()),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::networking::PROTOCOL_VERSION;
|
||||
|
||||
#[test]
|
||||
fn header_deserialization() {
|
||||
let valid_header = Header {
|
||||
payload_length: 1234,
|
||||
protocol_version: PROTOCOL_VERSION,
|
||||
};
|
||||
|
||||
let bytes = valid_header.into_bytes();
|
||||
assert_eq!(valid_header, Header::try_from_bytes(&bytes).unwrap())
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
// TODO: if this becomes too cumbersome, perhaps consider a more streamlined solution like tarpc
|
||||
// (I wouldn't have needed that for DKG, but if this is to be used for different purposes, maybe
|
||||
// it would have been more appropriate)
|
||||
|
||||
pub(crate) mod codec;
|
||||
pub(crate) mod error;
|
||||
pub(crate) mod message;
|
||||
pub(crate) mod receiver;
|
||||
pub(crate) mod sender;
|
||||
|
||||
pub(crate) const PROTOCOL_VERSION: u32 = 1;
|
||||
@@ -0,0 +1,98 @@
|
||||
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::networking::codec::OffchainCodec;
|
||||
use crate::networking::message::{ErrorResponseMessage, OffchainMessage};
|
||||
// use crate::dkg::state::StateAccessor;
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Duration;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time::timeout;
|
||||
use tokio_util::codec::Framed;
|
||||
|
||||
const DEFAULT_MAX_CONNECTION_DURATION: Duration = Duration::from_secs(2 * 60 * 60);
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ConnectionHandler {
|
||||
// connection cannot exist for more than this time
|
||||
max_connection_duration: Duration,
|
||||
// state_accessor: StateAccessor,
|
||||
conn: Framed<TcpStream, OffchainCodec>,
|
||||
remote: SocketAddr,
|
||||
}
|
||||
|
||||
impl ConnectionHandler {
|
||||
pub(crate) fn new(conn: TcpStream, remote: SocketAddr) -> Self {
|
||||
ConnectionHandler {
|
||||
max_connection_duration: DEFAULT_MAX_CONNECTION_DURATION,
|
||||
remote,
|
||||
conn: Framed::new(conn, OffchainCodec),
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_response(&mut self, response_message: OffchainMessage) {
|
||||
if let Err(err) = self.conn.send(&response_message).await {
|
||||
warn!("Failed to send response back to {} - {}", self.remote, err)
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_error_response(&mut self, id: Option<u64>, error: ErrorResponseMessage) {
|
||||
self.send_response(OffchainMessage::new_error_response(id, error))
|
||||
.await
|
||||
}
|
||||
|
||||
async fn handle_request(&mut self, request: OffchainMessage) {
|
||||
match request {
|
||||
OffchainMessage::ErrorResponse { id, .. } => {
|
||||
self.send_error_response(
|
||||
id,
|
||||
ErrorResponseMessage::InvalidRequest {
|
||||
typ: "ErrorResponse".into(),
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn _handle_connection(&mut self) {
|
||||
debug!("Starting connection handler for {}", self.remote);
|
||||
|
||||
while let Some(framed_dkg_request) = self.conn.next().await {
|
||||
trace!("received new message from {}", self.remote);
|
||||
match framed_dkg_request {
|
||||
Ok(framed_dkg_request) => self.handle_request(framed_dkg_request).await,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"The socket connection got corrupted with error: {:?}. Closing the socket",
|
||||
err
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Closing connection from {}", self.remote);
|
||||
}
|
||||
|
||||
pub async fn handle_connection(mut self) {
|
||||
let remote = self.remote;
|
||||
if timeout(self.max_connection_duration, self._handle_connection())
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
warn!(
|
||||
"we timed out while trying to resolve connection from {}",
|
||||
remote
|
||||
);
|
||||
self.send_error_response(
|
||||
None,
|
||||
ErrorResponseMessage::Timeout {
|
||||
timeout: self.max_connection_duration,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,47 @@
|
||||
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::networking::receiver::handler::ConnectionHandler;
|
||||
use log::debug;
|
||||
use std::fmt::Display;
|
||||
use std::net::SocketAddr;
|
||||
use std::process;
|
||||
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
|
||||
|
||||
// note that we do not expect persistent connections between dealers, they should only really
|
||||
// exist for the duration of a single message exchange
|
||||
pub struct Listener<A> {
|
||||
address: A,
|
||||
}
|
||||
|
||||
impl<A> Listener<A> {
|
||||
pub(crate) fn new(address: A) -> Self {
|
||||
Listener { address }
|
||||
}
|
||||
|
||||
fn on_connect(&self, conn: TcpStream, remote: SocketAddr) {
|
||||
tokio::spawn(ConnectionHandler::new(conn, remote).handle_connection());
|
||||
}
|
||||
|
||||
pub(crate) async fn run(&mut self)
|
||||
where
|
||||
A: ToSocketAddrs + Display,
|
||||
{
|
||||
debug!("starting off-chain DKG Listener");
|
||||
|
||||
let listener = match TcpListener::bind(&self.address).await {
|
||||
Ok(listener) => listener,
|
||||
Err(err) => {
|
||||
error!("Failed to bind to {} - {}.", self.address, err);
|
||||
process::exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
loop {
|
||||
match listener.accept().await {
|
||||
Ok((socket, remote_addr)) => self.on_connect(socket, remote_addr),
|
||||
Err(err) => warn!("Failed to accept incoming connection - {:?}", err),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
mod handler;
|
||||
mod listener;
|
||||
|
||||
pub(crate) use listener::Listener;
|
||||
@@ -0,0 +1,122 @@
|
||||
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::networking::message::OffchainMessage;
|
||||
use crate::networking::sender::{send_single_message, ConnectionConfig, SendResponse};
|
||||
use futures::channel::mpsc;
|
||||
use futures::{stream, StreamExt};
|
||||
use std::net::SocketAddr;
|
||||
|
||||
// TODO: for now just leave it here and make it configurable with proper config later
|
||||
const DEFAULT_CONCURRENCY: usize = 5;
|
||||
|
||||
type FeedbackSender = mpsc::UnboundedSender<SendResponse>;
|
||||
|
||||
pub(crate) struct Broadcaster {
|
||||
addresses: Vec<SocketAddr>,
|
||||
concurrency_level: usize,
|
||||
config: ConnectionConfig,
|
||||
}
|
||||
|
||||
impl Broadcaster {
|
||||
pub(crate) fn new(addresses: Vec<SocketAddr>, config: ConnectionConfig) -> Self {
|
||||
Broadcaster {
|
||||
addresses,
|
||||
concurrency_level: DEFAULT_CONCURRENCY,
|
||||
config,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn with_concurrency_level(mut self, concurrency_level: usize) -> Self {
|
||||
self.concurrency_level = concurrency_level;
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) fn set_addresses(&mut self, new_addresses: Vec<SocketAddr>) {
|
||||
self.addresses = new_addresses;
|
||||
}
|
||||
|
||||
fn create_broadcast_configs(
|
||||
&self,
|
||||
message: OffchainMessage,
|
||||
feedback_sender: Option<FeedbackSender>,
|
||||
) -> Vec<BroadcastConfig> {
|
||||
self.addresses
|
||||
.iter()
|
||||
.map(|&address| BroadcastConfig {
|
||||
address,
|
||||
config: self.config,
|
||||
feedback_sender: feedback_sender.clone(),
|
||||
message: message.clone(),
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub(crate) async fn broadcast_with_feedback(&self, msg: OffchainMessage) -> Vec<SendResponse> {
|
||||
if self.addresses.is_empty() {
|
||||
warn!("attempting to broadcast {} while no remotes are known", msg);
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
debug!("broadcasting {} to {} remotes", msg, self.addresses.len());
|
||||
let (feedback_tx, mut feedback_rx) = mpsc::unbounded();
|
||||
|
||||
stream::iter(self.create_broadcast_configs(msg, Some(feedback_tx)))
|
||||
.for_each_concurrent(self.concurrency_level, |cfg| cfg.send())
|
||||
.await;
|
||||
|
||||
let mut responses = Vec::new();
|
||||
|
||||
for _ in 0..self.addresses.len() {
|
||||
// we should have received exactly self.addresses number of responses
|
||||
// (they could be just Err failure responses, but should exist nonetheless)
|
||||
match feedback_rx.try_next() {
|
||||
Ok(Some(response)) => responses.push(response),
|
||||
Err(_) | Ok(None) => {
|
||||
error!("somehow we received fewer feedback responses than sent messages")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// the channel should have been drained and all sender should have been dropped
|
||||
debug_assert!(matches!(feedback_rx.try_next(), Ok(None)));
|
||||
responses
|
||||
}
|
||||
|
||||
pub(crate) async fn broadcast(&self, msg: OffchainMessage) {
|
||||
if self.addresses.is_empty() {
|
||||
warn!("attempting to broadcast {} while no remotes are known", msg);
|
||||
return;
|
||||
}
|
||||
|
||||
debug!("broadcasting {} to {} remotes", msg, self.addresses.len());
|
||||
stream::iter(self.create_broadcast_configs(msg, None))
|
||||
.for_each_concurrent(self.concurrency_level, |cfg| cfg.send())
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
// internal struct to have per-connection config on hand
|
||||
struct BroadcastConfig {
|
||||
address: SocketAddr,
|
||||
config: ConnectionConfig,
|
||||
feedback_sender: Option<FeedbackSender>,
|
||||
message: OffchainMessage,
|
||||
}
|
||||
|
||||
impl BroadcastConfig {
|
||||
async fn send(self) {
|
||||
let response = send_single_message(self.address, self.config, &self.message).await;
|
||||
if let Some(feedback_sender) = self.feedback_sender {
|
||||
// this can only fail if the receiver is disconnected which should never be the case
|
||||
// thus we can ignore the possible error
|
||||
let _ = feedback_sender.unbounded_send(response);
|
||||
} else if let Err(err) = response.response {
|
||||
// if we're not forwarding feedback, at least emit a warning about the failure
|
||||
warn!(
|
||||
"failed to broadcast {} to {} - {}",
|
||||
self.message, self.address, err
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,83 @@
|
||||
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::networking::codec::OffchainCodec;
|
||||
use crate::networking::error::NetworkingError;
|
||||
use crate::networking::message::OffchainMessage;
|
||||
use crate::networking::sender::ConnectionConfig;
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Duration;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time::timeout;
|
||||
use tokio_util::codec::Framed;
|
||||
|
||||
// this connection only exists for a single message
|
||||
pub(crate) struct EphemeralConnection {
|
||||
remote: SocketAddr,
|
||||
conn: Framed<TcpStream, OffchainCodec>,
|
||||
}
|
||||
|
||||
impl EphemeralConnection {
|
||||
pub(crate) async fn connect(
|
||||
address: SocketAddr,
|
||||
connection_timeout: Duration,
|
||||
) -> io::Result<Self> {
|
||||
trace!("attempting to connect to {}", address);
|
||||
let conn = match timeout(connection_timeout, TcpStream::connect(address)).await {
|
||||
Err(_timeout) => {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!("timed out while attempting to send message to {}", address),
|
||||
))
|
||||
}
|
||||
Ok(conn_res) => conn_res?,
|
||||
};
|
||||
let framed_conn = Framed::new(conn, OffchainCodec);
|
||||
Ok(Self {
|
||||
remote: address,
|
||||
conn: framed_conn,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn remote(&self) -> SocketAddr {
|
||||
self.remote
|
||||
}
|
||||
|
||||
async fn send(
|
||||
&mut self,
|
||||
message: &OffchainMessage,
|
||||
send_timeout: Duration,
|
||||
response_timeout: Option<Duration>,
|
||||
) -> Result<Option<OffchainMessage>, NetworkingError> {
|
||||
trace!("attempting to send to {}", self.remote);
|
||||
match timeout(send_timeout, self.conn.send(message)).await {
|
||||
Err(_timeout) => {
|
||||
return Err(NetworkingError::Io(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"timed out while attempting to send message",
|
||||
)))
|
||||
}
|
||||
Ok(res) => res?,
|
||||
}
|
||||
if let Some(response_timeout) = response_timeout {
|
||||
match timeout(response_timeout, self.conn.next()).await {
|
||||
Err(_elapsed) => Ok(None),
|
||||
Ok(response) => response.transpose(),
|
||||
}
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn connect_and_send(
|
||||
address: SocketAddr,
|
||||
cfg: ConnectionConfig,
|
||||
message: &OffchainMessage,
|
||||
) -> Result<Option<OffchainMessage>, NetworkingError> {
|
||||
let mut conn = EphemeralConnection::connect(address, cfg.connection_timeout).await?;
|
||||
conn.send(message, cfg.send_timeout, cfg.response_timeout)
|
||||
.await
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,65 @@
|
||||
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::networking::error::NetworkingError;
|
||||
use crate::networking::message::OffchainMessage;
|
||||
use crate::networking::sender::broadcast::Broadcaster;
|
||||
use crate::networking::sender::ephemeral::EphemeralConnection;
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Duration;
|
||||
|
||||
pub(crate) mod broadcast;
|
||||
pub(crate) mod ephemeral;
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub(crate) struct ConnectionConfig {
|
||||
pub(crate) connection_timeout: Duration,
|
||||
pub(crate) response_timeout: Option<Duration>,
|
||||
pub(crate) send_timeout: Duration,
|
||||
}
|
||||
|
||||
pub(crate) struct SendResponse {
|
||||
pub(crate) source: SocketAddr,
|
||||
pub(crate) response: Result<Option<OffchainMessage>, NetworkingError>,
|
||||
}
|
||||
|
||||
impl SendResponse {
|
||||
pub(crate) fn new(
|
||||
source: SocketAddr,
|
||||
response: Result<Option<OffchainMessage>, NetworkingError>,
|
||||
) -> Self {
|
||||
SendResponse { source, response }
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn send_single_message(
|
||||
address: SocketAddr,
|
||||
cfg: ConnectionConfig,
|
||||
message: &OffchainMessage,
|
||||
) -> SendResponse {
|
||||
let res = EphemeralConnection::connect_and_send(address, cfg, message).await;
|
||||
SendResponse::new(address, res)
|
||||
}
|
||||
|
||||
pub(crate) async fn broadcast_message(
|
||||
addresses: Vec<SocketAddr>,
|
||||
cfg: ConnectionConfig,
|
||||
message: &OffchainMessage,
|
||||
) {
|
||||
Broadcaster::new(addresses, cfg)
|
||||
.broadcast(message.clone())
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn broadcast_message_with_feedback(
|
||||
addresses: Vec<SocketAddr>,
|
||||
cfg: ConnectionConfig,
|
||||
message: &OffchainMessage,
|
||||
) -> Vec<SendResponse> {
|
||||
Broadcaster::new(addresses, cfg)
|
||||
.broadcast_with_feedback(message.clone())
|
||||
.await
|
||||
}
|
||||
|
||||
// NOTE: for the original purposes of DKG stateless broadcasts and one-off sends were enough,
|
||||
// so I never implemented proper persistent connections
|
||||
Reference in New Issue
Block a user