Compare commits

...

6 Commits

Author SHA1 Message Date
Jędrzej Stuczyński d6a81d9213 initial validator API-networking related things
adapted from DKG impl
2022-05-23 15:34:48 +01:00
Jędrzej Stuczyński a6db5fe704 Added STATE_DENOM network specific constant 2022-05-23 13:37:53 +01:00
Jędrzej Stuczyński fe57d08f3e actually calling dotenv at validator API startup 2022-05-23 13:36:05 +01:00
Jędrzej Stuczyński ae29b2300c optional serde support for x25519 keys 2022-05-23 13:34:28 +01:00
Jędrzej Stuczyński 7b98d62f96 optional serde support for ed25519 keys 2022-05-23 12:17:58 +01:00
Jędrzej Stuczyński 6abe95ed61 Added abci::Data field to ExecuteResult 2022-05-23 12:12:03 +01:00
22 changed files with 835 additions and 6 deletions
Generated
+6 -2
View File
@@ -1063,6 +1063,8 @@ dependencies = [
"pemstore",
"rand 0.7.3",
"rand_chacha 0.2.2",
"serde",
"serde_bytes",
"subtle-encoding",
"x25519-dalek",
]
@@ -1427,6 +1429,7 @@ version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d5c4b5e5959dc2c2b89918d8e2cc40fcdd623cef026ed09d2f0ee05199dc8e4"
dependencies = [
"serde",
"signature",
]
@@ -1440,6 +1443,7 @@ dependencies = [
"ed25519",
"rand 0.7.3",
"serde",
"serde_bytes",
"sha2",
"zeroize",
]
@@ -4834,9 +4838,9 @@ dependencies = [
[[package]]
name = "serde_bytes"
version = "0.11.5"
version = "0.11.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16ae07dd2f88a366f15bd0632ba725227018c69a1c8550a927324f8eb8368bb9"
checksum = "212e73464ebcde48d723aa02eb270ba62eff38a9b732df31f33f1b4e145f3a54"
dependencies = [
"serde",
]
@@ -324,6 +324,7 @@ pub trait SigningCosmWasmClient: CosmWasmClient {
Ok(ExecuteResult {
logs: parse_raw_logs(tx_res.tx_result.log)?,
data: tx_res.tx_result.data,
transaction_hash: tx_res.hash,
gas_info,
})
@@ -364,6 +365,7 @@ pub trait SigningCosmWasmClient: CosmWasmClient {
Ok(ExecuteResult {
logs: parse_raw_logs(tx_res.tx_result.log)?,
data: tx_res.tx_result.data,
transaction_hash: tx_res.hash,
gas_info,
})
@@ -25,6 +25,7 @@ use cosmrs::proto::cosmwasm::wasm::v1::{
CodeInfoResponse, ContractCodeHistoryEntry as ProtoContractCodeHistoryEntry,
ContractCodeHistoryOperationType, ContractInfo as ProtoContractInfo,
};
use cosmrs::tendermint::abci::Data;
use cosmrs::tendermint::{abci, chain};
use cosmrs::tx::{AccountNumber, Gas, SequenceNumber};
use cosmrs::{tx, AccountId, Any, Coin};
@@ -672,6 +673,8 @@ pub struct MigrateResult {
pub struct ExecuteResult {
pub logs: Vec<Log>,
pub data: Data,
/// Transaction hash (might be used as transaction ID)
pub transaction_hash: tx::Hash,
+3
View File
@@ -19,6 +19,8 @@ cipher = { version = "0.4.3", optional = true }
x25519-dalek = { version = "1.1", optional = true }
ed25519-dalek = { version = "1.0", optional = true }
rand = { version = "0.7.3", features = ["wasm-bindgen"], optional = true }
serde_bytes = { version = "0.11.6", optional = true }
serde_crate = { version = "1.0", optional = true, default_features = false, package = "serde" }
subtle-encoding = { version = "0.5", features = ["bech32-preview"]}
# internal
@@ -30,6 +32,7 @@ config = { path="../../common/config" }
rand_chacha = "0.2"
[features]
serde = ["serde_crate", "serde_bytes", "ed25519-dalek/serde", "x25519-dalek/serde"]
asymmetric = ["x25519-dalek", "ed25519-dalek"]
hashing = ["blake3", "digest", "hkdf", "hmac", "generic-array"]
symmetric = ["aes", "ctr", "cipher", "generic-array"]
+46 -1
View File
@@ -4,6 +4,8 @@
use pemstore::traits::{PemStorableKey, PemStorableKeyPair};
#[cfg(feature = "rand")]
use rand::{CryptoRng, RngCore};
#[cfg(feature = "serde")]
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::fmt::{self, Display, Formatter};
/// Size of a X25519 private key
@@ -127,6 +129,28 @@ impl PublicKey {
}
}
#[cfg(feature = "serde")]
impl Serialize for PublicKey {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
self.0.serialize(serializer)
}
}
#[cfg(feature = "serde")]
impl<'d> Deserialize<'d> for PublicKey {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'d>,
{
Ok(PublicKey(x25519_dalek::PublicKey::deserialize(
deserializer,
)?))
}
}
impl PemStorableKey for PublicKey {
type Error = KeyRecoveryError;
@@ -143,7 +167,6 @@ impl PemStorableKey for PublicKey {
}
}
#[derive(Clone)]
pub struct PrivateKey(x25519_dalek::StaticSecret);
impl Display for PrivateKey {
@@ -187,6 +210,28 @@ impl PrivateKey {
}
}
#[cfg(feature = "serde")]
impl Serialize for PrivateKey {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
self.0.serialize(serializer)
}
}
#[cfg(feature = "serde")]
impl<'d> Deserialize<'d> for PrivateKey {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'d>,
{
Ok(PrivateKey(x25519_dalek::StaticSecret::deserialize(
deserializer,
)?))
}
}
impl PemStorableKey for PrivateKey {
type Error = KeyRecoveryError;
+74 -1
View File
@@ -10,6 +10,13 @@ use pemstore::traits::{PemStorableKey, PemStorableKeyPair};
use rand::{CryptoRng, RngCore};
use std::fmt::{self, Display, Formatter};
#[cfg(feature = "serde")]
use serde::de::Error as SerdeError;
#[cfg(feature = "serde")]
use serde::{Deserialize, Deserializer, Serialize, Serializer};
#[cfg(feature = "serde")]
use serde_bytes::{ByteBuf as SerdeByteBuf, Bytes as SerdeBytes};
#[derive(Debug)]
pub enum Ed25519RecoveryError {
MalformedBytes(SignatureError),
@@ -40,6 +47,7 @@ impl fmt::Display for Ed25519RecoveryError {
impl std::error::Error for Ed25519RecoveryError {}
/// Keypair for usage in ed25519 EdDSA.
#[derive(Debug)]
pub struct KeyPair {
private_key: PrivateKey,
public_key: PublicKey,
@@ -135,6 +143,28 @@ impl PublicKey {
}
}
#[cfg(feature = "serde")]
impl Serialize for PublicKey {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
self.0.serialize(serializer)
}
}
#[cfg(feature = "serde")]
impl<'d> Deserialize<'d> for PublicKey {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'d>,
{
Ok(PublicKey(ed25519_dalek::PublicKey::deserialize(
deserializer,
)?))
}
}
impl PemStorableKey for PublicKey {
type Error = Ed25519RecoveryError;
@@ -200,6 +230,28 @@ impl PrivateKey {
}
}
#[cfg(feature = "serde")]
impl Serialize for PrivateKey {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
self.0.serialize(serializer)
}
}
#[cfg(feature = "serde")]
impl<'d> Deserialize<'d> for PrivateKey {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'d>,
{
Ok(PrivateKey(ed25519_dalek::SecretKey::deserialize(
deserializer,
)?))
}
}
impl PemStorableKey for PrivateKey {
type Error = Ed25519RecoveryError;
@@ -216,7 +268,7 @@ impl PemStorableKey for PrivateKey {
}
}
#[derive(Debug)]
#[derive(Copy, Clone, Debug)]
pub struct Signature(ed25519_dalek::Signature);
impl Signature {
@@ -237,3 +289,24 @@ impl Signature {
Ok(Signature(ed25519_dalek::Signature::from_bytes(bytes)?))
}
}
#[cfg(feature = "serde")]
impl Serialize for Signature {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
SerdeBytes::new(&self.to_bytes()).serialize(serializer)
}
}
#[cfg(feature = "serde")]
impl<'d> Deserialize<'d> for Signature {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'d>,
{
let bytes = <SerdeByteBuf>::deserialize(deserializer)?;
Signature::from_bytes(bytes.as_ref()).map_err(SerdeError::custom)
}
}
+2 -2
View File
@@ -29,5 +29,5 @@ pub use blake3;
#[cfg(feature = "symmetric")]
pub use ctr;
// TODO: this function uses all three modules: asymmetric crypto, symmetric crypto and derives key...,
// so I don't know where to put it...
#[cfg(feature = "serde")]
extern crate serde_crate as serde;
+3
View File
@@ -18,6 +18,7 @@ cfg_if::cfg_if! {
if #[cfg(network = "mainnet")] {
pub const DEFAULT_NETWORK: all::Network = all::Network::MAINNET;
pub const DENOM: &str = mainnet::DENOM;
pub const STAKE_DENOM: &str = mainnet::STAKE_DENOM;
pub const ETH_CONTRACT_ADDRESS: [u8; 20] = mainnet::_ETH_CONTRACT_ADDRESS;
pub const ETH_ERC20_CONTRACT_ADDRESS: [u8; 20] = mainnet::_ETH_ERC20_CONTRACT_ADDRESS;
@@ -25,6 +26,7 @@ cfg_if::cfg_if! {
} else if #[cfg(network = "qa")] {
pub const DEFAULT_NETWORK: all::Network = all::Network::QA;
pub const DENOM: &str = qa::DENOM;
pub const STAKE_DENOM: &str = qa::STAKE_DENOM;
pub const ETH_CONTRACT_ADDRESS: [u8; 20] = qa::_ETH_CONTRACT_ADDRESS;
pub const ETH_ERC20_CONTRACT_ADDRESS: [u8; 20] = qa::_ETH_ERC20_CONTRACT_ADDRESS;
@@ -32,6 +34,7 @@ cfg_if::cfg_if! {
} else if #[cfg(network = "sandbox")] {
pub const DEFAULT_NETWORK: all::Network = all::Network::SANDBOX;
pub const DENOM: &str = sandbox::DENOM;
pub const STAKE_DENOM: &str = sandbox::STAKE_DENOM;
pub const ETH_CONTRACT_ADDRESS: [u8; 20] = sandbox::_ETH_CONTRACT_ADDRESS;
pub const ETH_ERC20_CONTRACT_ADDRESS: [u8; 20] = sandbox::_ETH_ERC20_CONTRACT_ADDRESS;
+1
View File
@@ -5,6 +5,7 @@ use crate::ValidatorDetails;
pub(crate) const BECH32_PREFIX: &str = "n";
pub const DENOM: &str = "unym";
pub const STAKE_DENOM: &str = "unyx";
pub(crate) const MIXNET_CONTRACT_ADDRESS: &str =
"n14hj2tavq8fpesdwxxcu44rty3hh90vhujrvcmstl4zr3txmfvw9sjyvg3g";
+1
View File
@@ -5,6 +5,7 @@ use crate::ValidatorDetails;
pub(crate) const BECH32_PREFIX: &str = "n";
pub const DENOM: &str = "unym";
pub const STAKE_DENOM: &str = "unyx";
pub(crate) const MIXNET_CONTRACT_ADDRESS: &str =
"n1suhgf5svhu4usrurvxzlgn54ksxmn8gljarjtxqnapv8kjnp4nrsd3qaep";
+1
View File
@@ -5,6 +5,7 @@ use crate::ValidatorDetails;
pub(crate) const BECH32_PREFIX: &str = "nymt";
pub const DENOM: &str = "unymt";
pub const STAKE_DENOM: &str = "unyxt";
pub(crate) const MIXNET_CONTRACT_ADDRESS: &str = "nymt1ghd753shjuwexxywmgs4xz7x2q732vcnstz02j";
pub(crate) const VESTING_CONTRACT_ADDRESS: &str = "nymt14ejqjyq8um4p3xfqj74yld5waqljf88fn549lh";
+5
View File
@@ -45,6 +45,9 @@ mod rewarded_set_updater;
pub(crate) mod storage;
mod swagger;
#[allow(dead_code)]
mod networking;
#[cfg(feature = "coconut")]
mod coconut;
@@ -590,6 +593,8 @@ async fn run_validator_api(matches: ArgMatches<'static>) -> Result<()> {
async fn main() -> Result<()> {
println!("Starting validator api...");
dotenv::dotenv()?;
cfg_if::cfg_if! {if #[cfg(feature = "console-subscriber")] {
// instriment tokio console subscriber needs RUSTFLAGS="--cfg tokio_unstable" at build time
console_subscriber::init();
+65
View File
@@ -0,0 +1,65 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::networking::error::NetworkingError;
use crate::networking::message::{Header, OffchainMessage};
use crate::networking::PROTOCOL_VERSION;
use bytes::{Buf, BytesMut};
use tokio_util::codec::{Decoder, Encoder};
// TODO: that was fine for the purposes of DKG, we might want to increase it if it's used anywhere else
const MAX_ALLOWED_MESSAGE_LEN: u64 = 2 * 1024 * 1024;
#[derive(Debug)]
pub struct OffchainCodec;
impl<'a> Encoder<&'a OffchainMessage> for OffchainCodec {
type Error = NetworkingError;
fn encode(&mut self, item: &OffchainMessage, dst: &mut BytesMut) -> Result<(), Self::Error> {
item.encode(dst)
}
}
impl Decoder for OffchainCodec {
type Item = OffchainMessage;
type Error = NetworkingError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.len() < Header::LEN {
// can't do much without being able to deserialize the header
return Ok(None);
}
// header deserialization is simple enough to not be too cumbersome if it were to be repeated
let header = Header::try_from_bytes(&src[..Header::LEN])?;
if header.payload_length > MAX_ALLOWED_MESSAGE_LEN {
return Err(NetworkingError::MessageTooLarge {
supported: MAX_ALLOWED_MESSAGE_LEN,
received: header.payload_length,
});
}
if header.protocol_version != PROTOCOL_VERSION {
return Err(NetworkingError::MismatchedProtocolVersion {
expected: PROTOCOL_VERSION,
received: header.protocol_version,
});
}
if src.len() < Header::LEN + header.payload_length as usize {
// we haven't received the entire expected message yet.
// However, reserve enough bytes in the buffer for it
src.reserve(Header::LEN + header.payload_length as usize - src.len());
// We inform the Framed that we need more bytes to form the next frame.
return Ok(None);
}
let payload = src[Header::LEN..Header::LEN + header.payload_length as usize].to_vec();
src.advance(Header::LEN + header.payload_length as usize);
Ok(Some(OffchainMessage::try_from_bytes(payload)?))
}
}
+22
View File
@@ -0,0 +1,22 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::io;
use thiserror::Error;
// this would probably need to adjusted or maybe incorporated into existing stuff as originally
// this error was more extensive by being a global `DkgError`
#[derive(Error, Debug)]
pub enum NetworkingError {
#[error("Networking / IO error - {0}")]
Io(#[from] io::Error),
#[error("Received message with specified size bigger than the supported maximum. Received: {received}, supported: {supported}")]
MessageTooLarge { supported: u64, received: u64 },
#[error("Received message with unexpected protocol version. Received: {received}, expected: {expected}")]
MismatchedProtocolVersion { expected: u32, received: u32 },
#[error("Failed to deal with serialization (or deserialization) of the message - {0}")]
SerializationError(#[from] bincode::Error),
}
+165
View File
@@ -0,0 +1,165 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::networking::error::NetworkingError;
use crate::networking::PROTOCOL_VERSION;
use bytes::{BufMut, BytesMut};
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
use std::io;
use std::time::Duration;
use thiserror::Error;
// I left a sample `NewDealingMessage` to show how it was originally implemented
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum OffchainMessage {
// you'd add something like this:
// NewDealing {
// id: u64,
// message: NewDealingMessage,
// },
ErrorResponse {
id: Option<u64>,
message: ErrorResponseMessage,
},
}
impl Display for OffchainMessage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "OffchainMessage ")?;
match self {
// OffchainDkgMessage::NewDealing { id, message } => {
// write!(f, "with id {} and message: {}", id, message)
// }
OffchainMessage::ErrorResponse { id, message } => {
write!(f, "with id {:?} and message: {}", id, message)
}
}
}
}
// #[derive(Debug, Clone, Serialize, Deserialize)]
// pub struct NewDealingMessage {
// pub epoch_id: u32,
// pub dealing_bytes: Vec<u8>,
// pub dealer_signature: identity::Signature,
// }
// impl Display for NewDealingMessage {
// fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
// write!(
// f,
// "NewDealingMessage for epoch {} with length {}",
// self.epoch_id,
// self.dealing_bytes.len()
// )
// }
// }
#[derive(Debug, Clone, Serialize, Deserialize, Error)]
pub enum ErrorResponseMessage {
#[error("{typ} is not a valid request type")]
InvalidRequest { typ: String },
#[error("This request failed to get resolved within {} seconds", .timeout.as_secs())]
Timeout { timeout: Duration },
}
impl OffchainMessage {
pub(crate) fn new_error_response(id: Option<u64>, message: ErrorResponseMessage) -> Self {
OffchainMessage::ErrorResponse { id, message }
}
pub(crate) fn try_from_bytes(bytes: Vec<u8>) -> Result<Self, NetworkingError> {
Ok(bincode::deserialize(&bytes)?)
}
pub(crate) fn try_to_bytes(&self) -> Result<Vec<u8>, NetworkingError> {
Ok(bincode::serialize(&self)?)
}
fn frame(&self) -> Result<FramedOffchainDkgMessage, NetworkingError> {
let payload = self.try_to_bytes()?;
Ok(FramedOffchainDkgMessage {
header: Header {
payload_length: payload.len() as u64,
protocol_version: PROTOCOL_VERSION,
},
payload,
})
}
pub(crate) fn encode(&self, dst: &mut BytesMut) -> Result<(), NetworkingError> {
dst.put(self.frame()?.into_bytes().as_ref());
Ok(())
}
}
struct FramedOffchainDkgMessage {
header: Header,
payload: Vec<u8>,
}
impl FramedOffchainDkgMessage {
fn into_bytes(mut self) -> Vec<u8> {
let mut header_bytes = self.header.into_bytes();
let mut out = Vec::with_capacity(header_bytes.len() + self.payload.len());
out.append(&mut header_bytes);
out.append(&mut self.payload);
out
}
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub(crate) struct Header {
pub(crate) payload_length: u64,
pub(crate) protocol_version: u32,
}
impl Header {
pub(crate) const LEN: usize = 12;
pub(crate) fn into_bytes(self) -> Vec<u8> {
let mut out = Vec::with_capacity(Self::LEN);
out.extend_from_slice(&self.payload_length.to_be_bytes());
out.extend_from_slice(&self.protocol_version.to_be_bytes());
debug_assert_eq!(Self::LEN, out.len());
out
}
pub(crate) fn try_from_bytes(bytes: &[u8]) -> Result<Self, NetworkingError> {
if bytes.len() != Self::LEN {
return Err(NetworkingError::Io(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"OffchainMessageType::Header: got {} bytes, expected: {}",
bytes.len(),
Self::LEN
),
)));
}
Ok(Header {
payload_length: u64::from_be_bytes(bytes[..8].try_into().unwrap()),
protocol_version: u32::from_be_bytes(bytes[8..].try_into().unwrap()),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::networking::PROTOCOL_VERSION;
#[test]
fn header_deserialization() {
let valid_header = Header {
payload_length: 1234,
protocol_version: PROTOCOL_VERSION,
};
let bytes = valid_header.into_bytes();
assert_eq!(valid_header, Header::try_from_bytes(&bytes).unwrap())
}
}
+14
View File
@@ -0,0 +1,14 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
// TODO: if this becomes too cumbersome, perhaps consider a more streamlined solution like tarpc
// (I wouldn't have needed that for DKG, but if this is to be used for different purposes, maybe
// it would have been more appropriate)
pub(crate) mod codec;
pub(crate) mod error;
pub(crate) mod message;
pub(crate) mod receiver;
pub(crate) mod sender;
pub(crate) const PROTOCOL_VERSION: u32 = 1;
@@ -0,0 +1,98 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::networking::codec::OffchainCodec;
use crate::networking::message::{ErrorResponseMessage, OffchainMessage};
// use crate::dkg::state::StateAccessor;
use futures::{SinkExt, StreamExt};
use std::net::SocketAddr;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::time::timeout;
use tokio_util::codec::Framed;
const DEFAULT_MAX_CONNECTION_DURATION: Duration = Duration::from_secs(2 * 60 * 60);
#[derive(Debug)]
pub struct ConnectionHandler {
// connection cannot exist for more than this time
max_connection_duration: Duration,
// state_accessor: StateAccessor,
conn: Framed<TcpStream, OffchainCodec>,
remote: SocketAddr,
}
impl ConnectionHandler {
pub(crate) fn new(conn: TcpStream, remote: SocketAddr) -> Self {
ConnectionHandler {
max_connection_duration: DEFAULT_MAX_CONNECTION_DURATION,
remote,
conn: Framed::new(conn, OffchainCodec),
}
}
async fn send_response(&mut self, response_message: OffchainMessage) {
if let Err(err) = self.conn.send(&response_message).await {
warn!("Failed to send response back to {} - {}", self.remote, err)
}
}
async fn send_error_response(&mut self, id: Option<u64>, error: ErrorResponseMessage) {
self.send_response(OffchainMessage::new_error_response(id, error))
.await
}
async fn handle_request(&mut self, request: OffchainMessage) {
match request {
OffchainMessage::ErrorResponse { id, .. } => {
self.send_error_response(
id,
ErrorResponseMessage::InvalidRequest {
typ: "ErrorResponse".into(),
},
)
.await
}
}
}
async fn _handle_connection(&mut self) {
debug!("Starting connection handler for {}", self.remote);
while let Some(framed_dkg_request) = self.conn.next().await {
trace!("received new message from {}", self.remote);
match framed_dkg_request {
Ok(framed_dkg_request) => self.handle_request(framed_dkg_request).await,
Err(err) => {
warn!(
"The socket connection got corrupted with error: {:?}. Closing the socket",
err
);
break;
}
}
}
debug!("Closing connection from {}", self.remote);
}
pub async fn handle_connection(mut self) {
let remote = self.remote;
if timeout(self.max_connection_duration, self._handle_connection())
.await
.is_err()
{
warn!(
"we timed out while trying to resolve connection from {}",
remote
);
self.send_error_response(
None,
ErrorResponseMessage::Timeout {
timeout: self.max_connection_duration,
},
)
.await;
}
}
}
@@ -0,0 +1,47 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::networking::receiver::handler::ConnectionHandler;
use log::debug;
use std::fmt::Display;
use std::net::SocketAddr;
use std::process;
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
// note that we do not expect persistent connections between dealers, they should only really
// exist for the duration of a single message exchange
pub struct Listener<A> {
address: A,
}
impl<A> Listener<A> {
pub(crate) fn new(address: A) -> Self {
Listener { address }
}
fn on_connect(&self, conn: TcpStream, remote: SocketAddr) {
tokio::spawn(ConnectionHandler::new(conn, remote).handle_connection());
}
pub(crate) async fn run(&mut self)
where
A: ToSocketAddrs + Display,
{
debug!("starting off-chain DKG Listener");
let listener = match TcpListener::bind(&self.address).await {
Ok(listener) => listener,
Err(err) => {
error!("Failed to bind to {} - {}.", self.address, err);
process::exit(1);
}
};
loop {
match listener.accept().await {
Ok((socket, remote_addr)) => self.on_connect(socket, remote_addr),
Err(err) => warn!("Failed to accept incoming connection - {:?}", err),
}
}
}
}
@@ -0,0 +1,7 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
mod handler;
mod listener;
pub(crate) use listener::Listener;
@@ -0,0 +1,122 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::networking::message::OffchainMessage;
use crate::networking::sender::{send_single_message, ConnectionConfig, SendResponse};
use futures::channel::mpsc;
use futures::{stream, StreamExt};
use std::net::SocketAddr;
// TODO: for now just leave it here and make it configurable with proper config later
const DEFAULT_CONCURRENCY: usize = 5;
type FeedbackSender = mpsc::UnboundedSender<SendResponse>;
pub(crate) struct Broadcaster {
addresses: Vec<SocketAddr>,
concurrency_level: usize,
config: ConnectionConfig,
}
impl Broadcaster {
pub(crate) fn new(addresses: Vec<SocketAddr>, config: ConnectionConfig) -> Self {
Broadcaster {
addresses,
concurrency_level: DEFAULT_CONCURRENCY,
config,
}
}
pub(crate) fn with_concurrency_level(mut self, concurrency_level: usize) -> Self {
self.concurrency_level = concurrency_level;
self
}
pub(crate) fn set_addresses(&mut self, new_addresses: Vec<SocketAddr>) {
self.addresses = new_addresses;
}
fn create_broadcast_configs(
&self,
message: OffchainMessage,
feedback_sender: Option<FeedbackSender>,
) -> Vec<BroadcastConfig> {
self.addresses
.iter()
.map(|&address| BroadcastConfig {
address,
config: self.config,
feedback_sender: feedback_sender.clone(),
message: message.clone(),
})
.collect()
}
pub(crate) async fn broadcast_with_feedback(&self, msg: OffchainMessage) -> Vec<SendResponse> {
if self.addresses.is_empty() {
warn!("attempting to broadcast {} while no remotes are known", msg);
return Vec::new();
}
debug!("broadcasting {} to {} remotes", msg, self.addresses.len());
let (feedback_tx, mut feedback_rx) = mpsc::unbounded();
stream::iter(self.create_broadcast_configs(msg, Some(feedback_tx)))
.for_each_concurrent(self.concurrency_level, |cfg| cfg.send())
.await;
let mut responses = Vec::new();
for _ in 0..self.addresses.len() {
// we should have received exactly self.addresses number of responses
// (they could be just Err failure responses, but should exist nonetheless)
match feedback_rx.try_next() {
Ok(Some(response)) => responses.push(response),
Err(_) | Ok(None) => {
error!("somehow we received fewer feedback responses than sent messages")
}
}
}
// the channel should have been drained and all sender should have been dropped
debug_assert!(matches!(feedback_rx.try_next(), Ok(None)));
responses
}
pub(crate) async fn broadcast(&self, msg: OffchainMessage) {
if self.addresses.is_empty() {
warn!("attempting to broadcast {} while no remotes are known", msg);
return;
}
debug!("broadcasting {} to {} remotes", msg, self.addresses.len());
stream::iter(self.create_broadcast_configs(msg, None))
.for_each_concurrent(self.concurrency_level, |cfg| cfg.send())
.await
}
}
// internal struct to have per-connection config on hand
struct BroadcastConfig {
address: SocketAddr,
config: ConnectionConfig,
feedback_sender: Option<FeedbackSender>,
message: OffchainMessage,
}
impl BroadcastConfig {
async fn send(self) {
let response = send_single_message(self.address, self.config, &self.message).await;
if let Some(feedback_sender) = self.feedback_sender {
// this can only fail if the receiver is disconnected which should never be the case
// thus we can ignore the possible error
let _ = feedback_sender.unbounded_send(response);
} else if let Err(err) = response.response {
// if we're not forwarding feedback, at least emit a warning about the failure
warn!(
"failed to broadcast {} to {} - {}",
self.message, self.address, err
)
}
}
}
@@ -0,0 +1,83 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::networking::codec::OffchainCodec;
use crate::networking::error::NetworkingError;
use crate::networking::message::OffchainMessage;
use crate::networking::sender::ConnectionConfig;
use futures::{SinkExt, StreamExt};
use std::io;
use std::net::SocketAddr;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::time::timeout;
use tokio_util::codec::Framed;
// this connection only exists for a single message
pub(crate) struct EphemeralConnection {
remote: SocketAddr,
conn: Framed<TcpStream, OffchainCodec>,
}
impl EphemeralConnection {
pub(crate) async fn connect(
address: SocketAddr,
connection_timeout: Duration,
) -> io::Result<Self> {
trace!("attempting to connect to {}", address);
let conn = match timeout(connection_timeout, TcpStream::connect(address)).await {
Err(_timeout) => {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("timed out while attempting to send message to {}", address),
))
}
Ok(conn_res) => conn_res?,
};
let framed_conn = Framed::new(conn, OffchainCodec);
Ok(Self {
remote: address,
conn: framed_conn,
})
}
pub(crate) fn remote(&self) -> SocketAddr {
self.remote
}
async fn send(
&mut self,
message: &OffchainMessage,
send_timeout: Duration,
response_timeout: Option<Duration>,
) -> Result<Option<OffchainMessage>, NetworkingError> {
trace!("attempting to send to {}", self.remote);
match timeout(send_timeout, self.conn.send(message)).await {
Err(_timeout) => {
return Err(NetworkingError::Io(io::Error::new(
io::ErrorKind::Other,
"timed out while attempting to send message",
)))
}
Ok(res) => res?,
}
if let Some(response_timeout) = response_timeout {
match timeout(response_timeout, self.conn.next()).await {
Err(_elapsed) => Ok(None),
Ok(response) => response.transpose(),
}
} else {
Ok(None)
}
}
pub(crate) async fn connect_and_send(
address: SocketAddr,
cfg: ConnectionConfig,
message: &OffchainMessage,
) -> Result<Option<OffchainMessage>, NetworkingError> {
let mut conn = EphemeralConnection::connect(address, cfg.connection_timeout).await?;
conn.send(message, cfg.send_timeout, cfg.response_timeout)
.await
}
}
@@ -0,0 +1,65 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::networking::error::NetworkingError;
use crate::networking::message::OffchainMessage;
use crate::networking::sender::broadcast::Broadcaster;
use crate::networking::sender::ephemeral::EphemeralConnection;
use std::net::SocketAddr;
use std::time::Duration;
pub(crate) mod broadcast;
pub(crate) mod ephemeral;
#[derive(Debug, Clone, Copy)]
pub(crate) struct ConnectionConfig {
pub(crate) connection_timeout: Duration,
pub(crate) response_timeout: Option<Duration>,
pub(crate) send_timeout: Duration,
}
pub(crate) struct SendResponse {
pub(crate) source: SocketAddr,
pub(crate) response: Result<Option<OffchainMessage>, NetworkingError>,
}
impl SendResponse {
pub(crate) fn new(
source: SocketAddr,
response: Result<Option<OffchainMessage>, NetworkingError>,
) -> Self {
SendResponse { source, response }
}
}
pub(crate) async fn send_single_message(
address: SocketAddr,
cfg: ConnectionConfig,
message: &OffchainMessage,
) -> SendResponse {
let res = EphemeralConnection::connect_and_send(address, cfg, message).await;
SendResponse::new(address, res)
}
pub(crate) async fn broadcast_message(
addresses: Vec<SocketAddr>,
cfg: ConnectionConfig,
message: &OffchainMessage,
) {
Broadcaster::new(addresses, cfg)
.broadcast(message.clone())
.await
}
pub(crate) async fn broadcast_message_with_feedback(
addresses: Vec<SocketAddr>,
cfg: ConnectionConfig,
message: &OffchainMessage,
) -> Vec<SendResponse> {
Broadcaster::new(addresses, cfg)
.broadcast_with_feedback(message.clone())
.await
}
// NOTE: for the original purposes of DKG stateless broadcasts and one-off sends were enough,
// so I never implemented proper persistent connections