Compare commits
21 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c21b015fa3 | |||
| dfb7b707e7 | |||
| 66daddac96 | |||
| 06e7b95800 | |||
| 8883f64bd6 | |||
| d9f88ca515 | |||
| 667d8d34eb | |||
| 6b0a743f31 | |||
| e70303fcaa | |||
| 7659db973f | |||
| 72480543cb | |||
| e67e6a9838 | |||
| d9cfec125a | |||
| a72414e359 | |||
| 81548eba3f | |||
| 527fc5dfdd | |||
| 4fabb7a44c | |||
| b401123d59 | |||
| 8f83fd784f | |||
| 867e92c9f5 | |||
| 65895db582 |
Generated
+2
-2
@@ -6521,7 +6521,6 @@ dependencies = [
|
||||
"nym-validator-client",
|
||||
"nym-wireguard",
|
||||
"once_cell",
|
||||
"pretty_env_logger",
|
||||
"rand 0.7.3",
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
@@ -6723,7 +6722,6 @@ dependencies = [
|
||||
"nym-types",
|
||||
"nym-validator-client",
|
||||
"opentelemetry",
|
||||
"pretty_env_logger",
|
||||
"rand 0.7.3",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -6748,6 +6746,7 @@ dependencies = [
|
||||
"log",
|
||||
"nym-bin-common",
|
||||
"nym-crypto",
|
||||
"nym-mixnet-contract-common",
|
||||
"nym-network-defaults",
|
||||
"nym-sphinx-acknowledgements",
|
||||
"nym-sphinx-addressing",
|
||||
@@ -6757,6 +6756,7 @@ dependencies = [
|
||||
"nym-sphinx-types",
|
||||
"nym-task",
|
||||
"nym-validator-client",
|
||||
"parking_lot 0.12.1",
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
"thiserror",
|
||||
|
||||
+3
-1
@@ -162,8 +162,10 @@ serde_json = "1.0.91"
|
||||
tap = "1.0.1"
|
||||
time = "0.3.30"
|
||||
thiserror = "1.0.48"
|
||||
tokio = "1.24.1"
|
||||
tokio = "1.33.0"
|
||||
tokio-stream = "0.1.14"
|
||||
tokio-tungstenite = "0.20.1"
|
||||
tokio-util = "0.7.9"
|
||||
tracing = "0.1.37"
|
||||
tungstenite = { version = "0.20.1", default-features = false }
|
||||
ts-rs = "7.0.0"
|
||||
|
||||
@@ -47,7 +47,7 @@ nym-credential-storage = { path = "../credential-storage" }
|
||||
nym-network-defaults = { path = "../network-defaults" }
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-stream]
|
||||
version = "0.1.11"
|
||||
workspace = true
|
||||
features = ["time"]
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio]
|
||||
@@ -55,7 +55,7 @@ workspace = true
|
||||
features = ["time"]
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-tungstenite]
|
||||
version = "0.20.1"
|
||||
workspace = true
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.sqlx]
|
||||
version = "0.6.2"
|
||||
|
||||
@@ -40,7 +40,7 @@ workspace = true
|
||||
features = ["macros", "rt", "net", "sync", "time"]
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-stream]
|
||||
version = "0.1.11"
|
||||
workspace = true
|
||||
features = ["net", "sync", "time"]
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-tungstenite]
|
||||
|
||||
@@ -10,7 +10,7 @@ edition = "2021"
|
||||
futures = { workspace = true }
|
||||
log = { workspace = true }
|
||||
tokio = { version = "1.24.1", features = ["time", "net", "rt"] }
|
||||
tokio-util = { version = "0.7.4", features = ["codec"] }
|
||||
tokio-util = { workspace = true, features = ["codec"] }
|
||||
|
||||
# internal
|
||||
nym-sphinx = { path = "../../nymsphinx" }
|
||||
|
||||
@@ -123,13 +123,10 @@ impl Client {
|
||||
// We could have as well used conn.send_all(receiver.map(Ok)), but considering we don't care
|
||||
// about neither receiver nor the connection, it doesn't matter which one gets consumed
|
||||
if let Err(err) = receiver.map(Ok).forward(conn).await {
|
||||
warn!("Failed to forward packets to {} - {err}", address);
|
||||
warn!("Failed to forward packets to {address} - {err}");
|
||||
}
|
||||
|
||||
debug!(
|
||||
"connection manager to {} is finished. Either the connection failed or mixnet client got dropped",
|
||||
address
|
||||
);
|
||||
debug!("connection manager to {address} is finished. Either the connection failed or mixnet client got dropped");
|
||||
}
|
||||
|
||||
/// If we're trying to reconnect, determine how long we should wait.
|
||||
@@ -207,7 +204,7 @@ impl SendWithoutResponse for Client {
|
||||
if let Some(sender) = self.conn_new.get_mut(&address) {
|
||||
if let Err(err) = sender.channel.try_send(framed_packet) {
|
||||
if err.is_full() {
|
||||
debug!("Connection to {} seems to not be able to handle all the traffic - dropping the current packet", address);
|
||||
debug!("Connection to {address} seems to not be able to handle all the traffic - dropping the current packet");
|
||||
// it's not a 'big' error, but we did not manage to send the packet
|
||||
// if the queue is full, we can't really do anything but to drop the packet
|
||||
Err(io::Error::new(
|
||||
@@ -215,10 +212,8 @@ impl SendWithoutResponse for Client {
|
||||
"connection queue is full",
|
||||
))
|
||||
} else if err.is_disconnected() {
|
||||
debug!(
|
||||
"Connection to {} seems to be dead. attempting to re-establish it...",
|
||||
address
|
||||
);
|
||||
debug!("Connection to {address} seems to be dead. attempting to re-establish it...");
|
||||
|
||||
// it's not a 'big' error, but we did not manage to send the packet, but queue
|
||||
// it up to send it as soon as the connection is re-established
|
||||
self.make_connection(address, err.into_inner());
|
||||
@@ -238,7 +233,7 @@ impl SendWithoutResponse for Client {
|
||||
}
|
||||
} else {
|
||||
// there was never a connection to begin with
|
||||
debug!("establishing initial connection to {}", address);
|
||||
debug!("establishing initial connection to {address}");
|
||||
// it's not a 'big' error, but we did not manage to send the packet, but queue the packet
|
||||
// for sending for as soon as the connection is created
|
||||
self.make_connection(address, framed_packet);
|
||||
|
||||
@@ -55,6 +55,9 @@ pub use tendermint_rpc::{
|
||||
};
|
||||
pub use tendermint_rpc::{Request, Response, SimpleRequest};
|
||||
|
||||
#[cfg(feature = "http-client")]
|
||||
pub use tendermint_rpc::Url as RpcUrl;
|
||||
|
||||
#[cfg(feature = "http-client")]
|
||||
use crate::http_client;
|
||||
#[cfg(feature = "http-client")]
|
||||
|
||||
@@ -623,6 +623,24 @@ pub enum Layer {
|
||||
Three = 3,
|
||||
}
|
||||
|
||||
impl Layer {
|
||||
pub fn try_next(&self) -> Option<Self> {
|
||||
match self {
|
||||
Layer::One => Some(Layer::Two),
|
||||
Layer::Two => Some(Layer::Three),
|
||||
Layer::Three => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_previous(&self) -> Option<Self> {
|
||||
match self {
|
||||
Layer::One => None,
|
||||
Layer::Two => Some(Layer::One),
|
||||
Layer::Three => Some(Layer::Two),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Layer> for String {
|
||||
fn from(layer: Layer) -> Self {
|
||||
(layer as u8).to_string()
|
||||
|
||||
@@ -12,20 +12,21 @@ futures = { workspace = true }
|
||||
humantime-serde = "1.0"
|
||||
log = { workspace = true }
|
||||
rand = "0.8"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
tokio = { version = "1.24.1", features = [
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
tokio = { workspace = true, features = [
|
||||
"time",
|
||||
"macros",
|
||||
"rt",
|
||||
"net",
|
||||
"io-util",
|
||||
] }
|
||||
tokio-util = { version = "0.7.4", features = ["codec"] }
|
||||
tokio-util = { workspace = true, features = ["codec"] }
|
||||
url = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
parking_lot = { workspace = true }
|
||||
|
||||
## tracing
|
||||
tracing = { version = "0.1.37", optional = true }
|
||||
tracing = { workspace = true, optional = true }
|
||||
|
||||
nym-crypto = { path = "../crypto" }
|
||||
nym-network-defaults = { path = "../network-defaults" }
|
||||
@@ -37,6 +38,7 @@ nym-sphinx-params = { path = "../nymsphinx/params" }
|
||||
nym-sphinx-types = { path = "../nymsphinx/types" }
|
||||
nym-task = { path = "../task" }
|
||||
nym-validator-client = { path = "../client-libs/validator-client" }
|
||||
nym-mixnet-contract-common = { path = "../cosmwasm-smart-contracts/mixnet-contract" }
|
||||
nym-bin-common = { path = "../bin-common" }
|
||||
|
||||
cfg-if = "1.0.0"
|
||||
|
||||
@@ -0,0 +1,27 @@
|
||||
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use nym_validator_client::nyxd::error::NyxdError;
|
||||
use std::net::IpAddr;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ForwardTravelError {
|
||||
#[error("received a connection request from a forbidden address: '{address}'")]
|
||||
DisallowedIngressAddress { address: IpAddr },
|
||||
|
||||
#[error("received a request to open connection to a forbidden address: '{address}'")]
|
||||
DisallowedEgressAddress { address: IpAddr },
|
||||
|
||||
#[error("no valid nyxd urls are available for topology queries")]
|
||||
NoNyxdUrlsAvailable,
|
||||
|
||||
#[error("nyxd interaction failure: {source}")]
|
||||
NyxdFailure {
|
||||
#[from]
|
||||
source: NyxdError,
|
||||
},
|
||||
|
||||
#[error("the current epoch appears to be stuck")]
|
||||
StuckEpoch,
|
||||
}
|
||||
@@ -0,0 +1,356 @@
|
||||
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::forward_travel::error::ForwardTravelError;
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use nym_mixnet_contract_common::{EpochId, GatewayBond, Layer, MixNodeBond};
|
||||
use nym_network_defaults::NymNetworkDetails;
|
||||
use nym_task::TaskClient;
|
||||
use nym_validator_client::client::IdentityKey;
|
||||
use nym_validator_client::nyxd::contract_traits::{MixnetQueryClient, PagedMixnetQueryClient};
|
||||
use nym_validator_client::{nyxd, QueryHttpRpcNyxdClient};
|
||||
use parking_lot::RwLock;
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::{thread_rng, Rng};
|
||||
use std::collections::HashSet;
|
||||
use std::mem;
|
||||
use std::net::{IpAddr, ToSocketAddrs};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::time::sleep;
|
||||
use url::Url;
|
||||
|
||||
pub mod error;
|
||||
|
||||
// TODO: to allow for separate ingress/egress we have to change our layer selection algorithm, i.e. it has to be announced before epoch change
|
||||
pub type AllowedIngress = AllowedPaths;
|
||||
pub type AllowedEgress = AllowedPaths;
|
||||
|
||||
pub struct AllowedAddressesProvider {
|
||||
current_epoch: EpochId,
|
||||
|
||||
identity: IdentityKey,
|
||||
|
||||
client_config: nyxd::Config,
|
||||
|
||||
/// URLs to the nyxd validators for obtaining unfiltered network topology.
|
||||
nyxd_endpoints: Vec<Url>,
|
||||
|
||||
// to allow for separate ingress/egress we have to change our layer selection algorithm, i.e. it has to be announced before epoch change
|
||||
// ingress: AllowedIngress,
|
||||
// egress: AllowedEgress,
|
||||
allowed: AllowedPaths,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl AllowedAddressesProvider {
|
||||
pub async fn new(
|
||||
identity: IdentityKey,
|
||||
nyxd_endpoints: Vec<Url>,
|
||||
allow_all: bool,
|
||||
network_details: Option<NymNetworkDetails>,
|
||||
) -> Result<Self, ForwardTravelError> {
|
||||
let network = network_details.unwrap_or(NymNetworkDetails::new_mainnet());
|
||||
let mut provider = AllowedAddressesProvider {
|
||||
current_epoch: 0,
|
||||
identity,
|
||||
client_config: nyxd::Config::try_from_nym_network_details(&network)?,
|
||||
nyxd_endpoints,
|
||||
allowed: AllowedPaths::new(allow_all),
|
||||
};
|
||||
|
||||
if !allow_all {
|
||||
// set initial values for ingress/egress
|
||||
let client = provider.ephemeral_nyxd_client()?;
|
||||
provider.update_state(client).await?;
|
||||
}
|
||||
|
||||
Ok(provider)
|
||||
}
|
||||
|
||||
fn ephemeral_nyxd_client(&self) -> Result<QueryHttpRpcNyxdClient, ForwardTravelError> {
|
||||
let mut possible_nyxd_endpoints = self.nyxd_endpoints.clone();
|
||||
possible_nyxd_endpoints.shuffle(&mut thread_rng());
|
||||
|
||||
let mut last_error = match QueryHttpRpcNyxdClient::connect(
|
||||
self.client_config.clone(),
|
||||
possible_nyxd_endpoints
|
||||
.pop()
|
||||
.ok_or(ForwardTravelError::NoNyxdUrlsAvailable)?
|
||||
.as_str(),
|
||||
) {
|
||||
Ok(client) => return Ok(client),
|
||||
Err(err) => err,
|
||||
};
|
||||
|
||||
for url in possible_nyxd_endpoints {
|
||||
match QueryHttpRpcNyxdClient::connect(self.client_config.clone(), url.as_str()) {
|
||||
Ok(client) => return Ok(client),
|
||||
Err(err) => last_error = err,
|
||||
};
|
||||
}
|
||||
|
||||
Err(last_error.into())
|
||||
}
|
||||
|
||||
pub fn ingress(&self) -> AllowedIngress {
|
||||
self.allowed.clone()
|
||||
}
|
||||
|
||||
pub fn egress(&self) -> AllowedEgress {
|
||||
self.allowed.clone()
|
||||
}
|
||||
|
||||
fn add_node_ips(raw_host: &str, identity: &str, set: &mut HashSet<IpAddr>) {
|
||||
if let Ok(ip) = IpAddr::from_str(raw_host) {
|
||||
set.insert(ip);
|
||||
} else {
|
||||
// this might still be a valid hostname
|
||||
//
|
||||
// annoyingly there exists a method of looking up a socket address but not an ip address,
|
||||
// so append any port and perform the lookup
|
||||
let Ok(sockets) = format!("{raw_host}:1789").to_socket_addrs() else {
|
||||
warn!("failed to resolve ip address of node '{identity}' (hostname: '{raw_host}')");
|
||||
return;
|
||||
};
|
||||
|
||||
for socket in sockets {
|
||||
set.insert(socket.ip());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_all_addresses(nodes: &[MixNodeBond], gateways: &[GatewayBond]) -> HashSet<IpAddr> {
|
||||
let mut allowed = HashSet::new();
|
||||
|
||||
for node in nodes.iter() {
|
||||
Self::add_node_ips(&node.mix_node.host, node.identity(), &mut allowed);
|
||||
}
|
||||
|
||||
for gateway in gateways.iter() {
|
||||
Self::add_node_ips(&gateway.gateway.host, gateway.identity(), &mut allowed);
|
||||
}
|
||||
|
||||
allowed
|
||||
}
|
||||
|
||||
/// Gets ip addresses of all mixnodes on given layer
|
||||
fn get_addresses_on_layer(layer: Layer, nodes: &[MixNodeBond]) -> HashSet<IpAddr> {
|
||||
let mut allowed = HashSet::new();
|
||||
|
||||
for node in nodes.iter().filter(|m| m.layer == layer) {
|
||||
Self::add_node_ips(&node.mix_node.host, node.identity(), &mut allowed);
|
||||
}
|
||||
|
||||
allowed
|
||||
}
|
||||
|
||||
fn gateway_addresses(gateways: &[GatewayBond]) -> HashSet<IpAddr> {
|
||||
let mut allowed = HashSet::new();
|
||||
|
||||
for gateway in gateways.iter() {
|
||||
Self::add_node_ips(&gateway.gateway.host, gateway.identity(), &mut allowed);
|
||||
}
|
||||
|
||||
allowed
|
||||
}
|
||||
|
||||
fn locate_layer(&self, nodes: &[MixNodeBond]) -> Option<Layer> {
|
||||
nodes
|
||||
.iter()
|
||||
.find(|m| m.identity() == self.identity)
|
||||
.map(|m| m.layer)
|
||||
}
|
||||
|
||||
fn is_gateway(&self, gateways: &[GatewayBond]) -> bool {
|
||||
gateways
|
||||
.iter()
|
||||
.any(|g| g.gateway.identity_key == self.identity)
|
||||
}
|
||||
|
||||
async fn update_state(
|
||||
&mut self,
|
||||
client: QueryHttpRpcNyxdClient,
|
||||
) -> Result<(), ForwardTravelError> {
|
||||
let current_interval = client.get_current_interval_details().await?;
|
||||
let current_epoch = current_interval.interval.current_epoch_absolute_id();
|
||||
|
||||
if current_epoch == self.current_epoch {
|
||||
error!("can't update the allowed ips list as the epoch appears to be stuck");
|
||||
return Err(ForwardTravelError::StuckEpoch);
|
||||
}
|
||||
|
||||
let has_epoch_deviated = current_epoch > self.current_epoch + 1;
|
||||
|
||||
let mixnodes = client.get_all_mixnode_bonds().await?;
|
||||
let gateways = client.get_all_gateways().await?;
|
||||
|
||||
let new_allowed = Self::get_all_addresses(&mixnodes, &gateways);
|
||||
|
||||
// I'm leaving this code commented out to preserve this logic for when we need it
|
||||
// to update to proper ingress/egress filtering
|
||||
|
||||
// let our_mix_layer = self.locate_layer(&mixnodes);
|
||||
//
|
||||
// let previous_mix_layer = our_mix_layer.and_then(|l| l.try_previous());
|
||||
// let next_mix_layer = our_mix_layer.and_then(|l| l.try_next());
|
||||
//
|
||||
// let (allowed_ingress, allowed_egress) = match (previous_mix_layer, next_mix_layer) {
|
||||
// // layer 1
|
||||
// (None, Some(next)) => {
|
||||
// let gateways = client.get_all_gateways().await?;
|
||||
//
|
||||
// (
|
||||
// Self::gateway_addresses(&gateways),
|
||||
// Self::get_addresses_on_layer(next, &mixnodes),
|
||||
// )
|
||||
// }
|
||||
// // layer 2
|
||||
// (Some(previous), Some(next)) => (
|
||||
// Self::get_addresses_on_layer(previous, &mixnodes),
|
||||
// Self::get_addresses_on_layer(next, &mixnodes),
|
||||
// ),
|
||||
// // layer 3
|
||||
// (Some(previous), None) => {
|
||||
// let gateways = client.get_all_gateways().await?;
|
||||
// (
|
||||
// Self::get_addresses_on_layer(previous, &mixnodes),
|
||||
// Self::gateway_addresses(&gateways),
|
||||
// )
|
||||
// }
|
||||
// // gateway (or not bonded)
|
||||
// (None, None) => {
|
||||
// let gateways = client.get_all_gateways().await?;
|
||||
//
|
||||
// if self.is_gateway(&gateways) {
|
||||
// let mut base_ingress = Self::get_addresses_on_layer(Layer::Three, &mixnodes);
|
||||
// let mut base_egress = Self::get_addresses_on_layer(Layer::One, &mixnodes);
|
||||
//
|
||||
// // TODO: this extension should be conditional on whether the node is running the vpn module
|
||||
// let gw_extension = Self::gateway_addresses(&gateways);
|
||||
//
|
||||
// base_ingress.extend(gw_extension.clone());
|
||||
// base_egress.extend(gw_extension.clone());
|
||||
//
|
||||
// (base_ingress, base_egress)
|
||||
// } else {
|
||||
// warn!("our node doesn't appear to be bonded - going to permit traffic from ALL mixnodes and gateways");
|
||||
// let all = Self::get_all_addresses(&mixnodes, &gateways);
|
||||
// (all.clone(), all)
|
||||
// }
|
||||
// }
|
||||
// };
|
||||
|
||||
self.current_epoch = current_epoch;
|
||||
self.allowed.advance_epoch(new_allowed, has_epoch_deviated);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn update_allowed_addresses(&mut self) -> Result<(), ForwardTravelError> {
|
||||
// create new client every epoch because it results in different nyxd endpoint being used
|
||||
// what may help in distributing the load
|
||||
let client = self.ephemeral_nyxd_client()?;
|
||||
self.wait_for_epoch_rollover(&client).await?;
|
||||
self.update_state(client).await
|
||||
}
|
||||
|
||||
async fn wait_for_epoch_rollover(
|
||||
&self,
|
||||
client: &QueryHttpRpcNyxdClient,
|
||||
) -> Result<(), ForwardTravelError> {
|
||||
let current_interval = client.get_current_interval_details().await?;
|
||||
let current_epoch = current_interval.interval.current_epoch_absolute_id();
|
||||
|
||||
if current_epoch <= self.current_epoch {
|
||||
let remaining = current_interval.time_until_current_epoch_end();
|
||||
// add few more seconds to account for block time drift and to spread queries of all
|
||||
// other nodes
|
||||
let adjustment_secs = rand::thread_rng().gen_range(5..90);
|
||||
sleep(remaining + Duration::from_secs(adjustment_secs)).await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn run(&mut self, mut task_client: TaskClient) {
|
||||
if self.allowed.allow_all {
|
||||
// debug_assert!(self.egress.allow_all);
|
||||
|
||||
info!("the forward travel is currently disabled - there's no point in starting the route refresher");
|
||||
task_client.mark_as_success();
|
||||
return;
|
||||
}
|
||||
|
||||
debug!("Started ValidAddressesProvider with graceful shutdown support");
|
||||
while !task_client.is_shutdown() {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = task_client.recv() => {
|
||||
trace!("ValidAddressesProvider: Received shutdown");
|
||||
}
|
||||
res = self.update_allowed_addresses() => {
|
||||
if let Err(err) = res {
|
||||
warn!("failed to update the allowed addresses: {err}");
|
||||
|
||||
// don't retry immediately in case it was a network failure, wait a bit instead.
|
||||
task_client.wait(Duration::from_secs(5 * 60)).await
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
task_client.recv_timeout().await;
|
||||
log::debug!("ValidAddressesProvider: Exiting");
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AllowedPaths {
|
||||
// this is fine that this value is not wrapped in an Arc and is not atomic given
|
||||
// it's not expected to be modified at runtime
|
||||
allow_all: bool,
|
||||
inner: Arc<RwLock<AllowedPathsInner>>,
|
||||
}
|
||||
|
||||
impl AllowedPaths {
|
||||
fn new(allow_all: bool) -> Self {
|
||||
AllowedPaths {
|
||||
allow_all,
|
||||
inner: Arc::new(RwLock::new(AllowedPathsInner {
|
||||
previous_epoch: HashSet::new(),
|
||||
current_epoch: HashSet::new(),
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_allowed(&self, address: IpAddr) -> bool {
|
||||
if self.allow_all {
|
||||
return true;
|
||||
}
|
||||
|
||||
let guard = self.inner.read();
|
||||
guard.current_epoch.contains(&address) || guard.previous_epoch.contains(&address)
|
||||
}
|
||||
|
||||
fn advance_epoch(&self, current_epoch: HashSet<IpAddr>, reset_previous: bool) {
|
||||
// if this is triggered, it's an implementation bug;
|
||||
// we shouldn't be updating data if we're allowing everything regardless
|
||||
debug_assert!(!self.allow_all);
|
||||
|
||||
let mut guard = self.inner.write();
|
||||
|
||||
let old_current = mem::replace(&mut guard.current_epoch, current_epoch);
|
||||
|
||||
if reset_previous {
|
||||
guard.previous_epoch = HashSet::new()
|
||||
} else {
|
||||
guard.previous_epoch = old_current;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct AllowedPathsInner {
|
||||
previous_epoch: HashSet<IpAddr>,
|
||||
current_epoch: HashSet<IpAddr>,
|
||||
}
|
||||
@@ -1,5 +1,7 @@
|
||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||
// Copyright 2021-2023 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
pub mod forward_travel;
|
||||
pub mod packet_processor;
|
||||
pub mod verloc;
|
||||
|
||||
|
||||
@@ -1,81 +1,50 @@
|
||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||
// Copyright 2021-2023 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use std::fmt::{self, Display, Formatter};
|
||||
use std::io;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Error)]
|
||||
pub enum RttError {
|
||||
#[error("the received echo packet had unexpected size")]
|
||||
UnexpectedEchoPacketSize,
|
||||
|
||||
#[error("the received reply packet had unexpected size")]
|
||||
UnexpectedReplyPacketSize,
|
||||
|
||||
#[error("the received echo packet had malformed sender")]
|
||||
MalformedSenderIdentity,
|
||||
|
||||
#[error("the received echo packet had malformed signature")]
|
||||
MalformedEchoSignature,
|
||||
|
||||
#[error("the received reply packet had malformed signature")]
|
||||
MalformedReplySignature,
|
||||
|
||||
#[error("the received echo packet had invalid signature")]
|
||||
InvalidEchoSignature,
|
||||
|
||||
#[error("the received reply packet had invalid signature")]
|
||||
InvalidReplySignature,
|
||||
|
||||
UnreachableNode(String, io::Error),
|
||||
UnexpectedConnectionFailureWrite(String, io::Error),
|
||||
UnexpectedConnectionFailureRead(String, io::Error),
|
||||
#[error("could not establish connection to {0}: {1}")]
|
||||
UnreachableNode(String, #[source] io::Error),
|
||||
|
||||
#[error("failed to write echo packet to {0}: {1}")]
|
||||
UnexpectedConnectionFailureWrite(String, #[source] io::Error),
|
||||
|
||||
#[error("failed to read reply packet from {0}: {1}")]
|
||||
UnexpectedConnectionFailureRead(String, #[source] io::Error),
|
||||
|
||||
#[error("timed out while trying to read reply packet from {0}")]
|
||||
ConnectionReadTimeout(String),
|
||||
|
||||
#[error("timed out while trying to write echo packet to {0}")]
|
||||
ConnectionWriteTimeout(String),
|
||||
|
||||
#[error("the received reply packet had an unexpected sequence number")]
|
||||
UnexpectedReplySequence,
|
||||
|
||||
#[error("shutdown signal received")]
|
||||
ShutdownReceived,
|
||||
}
|
||||
|
||||
impl Display for RttError {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
RttError::UnexpectedEchoPacketSize => {
|
||||
write!(f, "The received echo packet had unexpected size")
|
||||
}
|
||||
RttError::UnexpectedReplyPacketSize => {
|
||||
write!(f, "The received reply packet had unexpected size")
|
||||
}
|
||||
RttError::MalformedSenderIdentity => {
|
||||
write!(f, "The received echo packet had malformed sender")
|
||||
}
|
||||
RttError::MalformedEchoSignature => {
|
||||
write!(f, "The received echo packet had malformed signature")
|
||||
}
|
||||
RttError::MalformedReplySignature => {
|
||||
write!(f, "The received reply packet had malformed signature")
|
||||
}
|
||||
RttError::InvalidEchoSignature => {
|
||||
write!(f, "The received echo packet had invalid signature")
|
||||
}
|
||||
RttError::InvalidReplySignature => {
|
||||
write!(f, "The received reply packet had invalid signature")
|
||||
}
|
||||
RttError::UnreachableNode(id, err) => {
|
||||
write!(f, "Could not establish connection to {id} - {err}")
|
||||
}
|
||||
RttError::UnexpectedConnectionFailureWrite(id, err) => {
|
||||
write!(f, "Failed to write echo packet to {id} - {err}")
|
||||
}
|
||||
RttError::UnexpectedConnectionFailureRead(id, err) => {
|
||||
write!(f, "Failed to read reply packet from {id} - {err}")
|
||||
}
|
||||
RttError::ConnectionReadTimeout(id) => {
|
||||
write!(f, "Timed out while trying to read reply packet from {id}")
|
||||
}
|
||||
RttError::ConnectionWriteTimeout(id) => {
|
||||
write!(f, "Timed out while trying to write echo packet to {id}")
|
||||
}
|
||||
RttError::UnexpectedReplySequence => write!(
|
||||
f,
|
||||
"The received reply packet had an unexpected sequence number"
|
||||
),
|
||||
RttError::ShutdownReceived => {
|
||||
write!(f, "Shutdown signal received")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for RttError {}
|
||||
|
||||
@@ -7,13 +7,13 @@ edition = "2021"
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
tokio-stream = "0.1.11" # this one seems to be a thing until `Stream` trait is stabilised in stdlib
|
||||
tokio-stream = { workspace = true } # this one seems to be a thing until `Stream` trait is stabilised in stdlib
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio]
|
||||
workspace = true
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-util]
|
||||
version = "0.7.4"
|
||||
workspace = true
|
||||
features = ["time"]
|
||||
|
||||
[target."cfg(target_arch = \"wasm32\")".dependencies.wasmtimer]
|
||||
|
||||
@@ -9,7 +9,7 @@ repository = { workspace = true }
|
||||
|
||||
[dependencies]
|
||||
bytes = "1.0"
|
||||
tokio-util = { version = "0.7.4", features = ["codec"] }
|
||||
tokio-util = { workspace = true, features = ["codec"] }
|
||||
thiserror = { workspace = true }
|
||||
|
||||
nym-sphinx-types = { path = "../types", features = ["sphinx", "outfox"] }
|
||||
|
||||
@@ -9,7 +9,7 @@ edition = "2021"
|
||||
[dependencies]
|
||||
bytes = "1.0"
|
||||
tokio = { version = "1.24.1", features = [ "net", "io-util", "sync", "macros", "time", "rt-multi-thread" ] }
|
||||
tokio-util = { version = "0.7.4", features = [ "io" ] } # reason for getting this guy is to to able to port to tokio 1.X more quickly by being able to use
|
||||
tokio-util = { workspace = true, features = [ "io" ] } # reason for getting this guy is to to able to port to tokio 1.X more quickly by being able to use
|
||||
# their `read_buf` [from the util crate] replacement rather than having to rethink/reimplement `AvailableReader` with the new AsyncRead trait definition.
|
||||
# In the long run, the dependency should probably get removed in favour of pure-tokio implementation, but for time being it's fine.
|
||||
futures = { workspace = true }
|
||||
|
||||
@@ -373,6 +373,14 @@ impl TaskClient {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn wait(&mut self, duration: Duration) {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = self.recv() => (),
|
||||
_ = sleep(duration) => (),
|
||||
}
|
||||
}
|
||||
|
||||
// Create a dummy that will never report that we should shutdown.
|
||||
pub fn dummy() -> TaskClient {
|
||||
let (_notify_tx, notify_rx) = watch::channel(());
|
||||
|
||||
+2
-2
@@ -45,9 +45,9 @@ serde = { version = "1.0", features = ["derive"] }
|
||||
serde_derive = "1.0.149"
|
||||
serde_json = "1.0.91"
|
||||
thiserror = { workspace = true }
|
||||
tokio = { version = "1", features = ["macros", "net","rt-multi-thread"] }
|
||||
tokio = { workspace = true, features = ["macros", "net","rt-multi-thread"] }
|
||||
tokio-tungstenite = { workspace = true }
|
||||
tokio-util = { version = "0.7.4", features = ["full"] }
|
||||
tokio-util = { workspace = true, features = ["full"] }
|
||||
toml = "0.7.0"
|
||||
unsigned-varint = "0.7.1"
|
||||
utoipa = { workspace = true, features = ["actix_extras"] }
|
||||
|
||||
+4
-5
@@ -31,8 +31,7 @@ humantime-serde = "1.0.1"
|
||||
ipnetwork = "0.16"
|
||||
lazy_static = "1.4.0"
|
||||
log = { workspace = true }
|
||||
once_cell = "1.7.2"
|
||||
pretty_env_logger = "0.4"
|
||||
once_cell = { workspace = true }
|
||||
rand = "0.7"
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
@@ -51,9 +50,9 @@ tokio = { workspace = true, features = [
|
||||
"fs",
|
||||
"time",
|
||||
] }
|
||||
tokio-stream = { version = "0.1.11", features = ["fs"] }
|
||||
tokio-tungstenite = { version = "0.20.1" }
|
||||
tokio-util = { version = "0.7.4", features = ["codec"] }
|
||||
tokio-stream = { workspace = true, features = ["fs"] }
|
||||
tokio-tungstenite = { workspace = true }
|
||||
tokio-util = { workspace = true, features = ["codec"] }
|
||||
url = { workspace = true, features = ["serde"] }
|
||||
zeroize = { workspace = true }
|
||||
|
||||
|
||||
@@ -38,6 +38,7 @@ pub(crate) struct OverrideConfig {
|
||||
pub(crate) statistics_service_url: Option<url::Url>,
|
||||
pub(crate) nym_apis: Option<Vec<url::Url>>,
|
||||
pub(crate) mnemonic: Option<bip39::Mnemonic>,
|
||||
pub(crate) enforce_forward_travel: Option<bool>,
|
||||
pub(crate) nyxd_urls: Option<Vec<url::Url>>,
|
||||
pub(crate) only_coconut_credentials: Option<bool>,
|
||||
pub(crate) with_network_requester: Option<bool>,
|
||||
@@ -52,6 +53,10 @@ impl OverrideConfig {
|
||||
.with_optional(Config::with_listening_address, self.listening_address)
|
||||
.with_optional(Config::with_mix_port, self.mix_port)
|
||||
.with_optional(Config::with_clients_port, self.clients_port)
|
||||
.with_optional(
|
||||
Config::with_enforce_forward_travel,
|
||||
self.enforce_forward_travel,
|
||||
)
|
||||
.with_optional_custom_env(
|
||||
Config::with_custom_nym_apis,
|
||||
self.nym_apis,
|
||||
|
||||
@@ -20,41 +20,41 @@ use super::helpers::OverrideIpPacketRouterConfig;
|
||||
#[derive(Args, Clone)]
|
||||
pub struct Init {
|
||||
/// Id of the gateway we want to create config for
|
||||
#[clap(long)]
|
||||
#[arg(long)]
|
||||
id: String,
|
||||
|
||||
/// The listening address on which the gateway will be receiving sphinx packets and listening for client data
|
||||
#[clap(long, alias = "host")]
|
||||
#[arg(long, alias = "host")]
|
||||
listening_address: IpAddr,
|
||||
|
||||
/// Comma separated list of public ip addresses that will announced to the nym-api and subsequently to the clients.
|
||||
/// In nearly all circumstances, it's going to be identical to the address you're going to use for bonding.
|
||||
#[clap(long, value_delimiter = ',')]
|
||||
#[arg(long, value_delimiter = ',')]
|
||||
public_ips: Option<Vec<IpAddr>>,
|
||||
|
||||
/// Optional hostname associated with this gateway that will announced to the nym-api and subsequently to the clients
|
||||
#[clap(long)]
|
||||
#[arg(long)]
|
||||
hostname: Option<String>,
|
||||
|
||||
/// The port on which the gateway will be listening for sphinx packets
|
||||
#[clap(long)]
|
||||
#[arg(long)]
|
||||
mix_port: Option<u16>,
|
||||
|
||||
/// The port on which the gateway will be listening for clients gateway-requests
|
||||
#[clap(long)]
|
||||
#[arg(long)]
|
||||
clients_port: Option<u16>,
|
||||
|
||||
/// Path to sqlite database containing all gateway persistent data
|
||||
#[clap(long)]
|
||||
#[arg(long)]
|
||||
datastore: Option<PathBuf>,
|
||||
|
||||
/// Comma separated list of endpoints of nym APIs
|
||||
#[clap(long, alias = "validator_apis", value_delimiter = ',')]
|
||||
#[arg(long, alias = "validator_apis", value_delimiter = ',')]
|
||||
// the alias here is included for backwards compatibility (1.1.4 and before)
|
||||
nym_apis: Option<Vec<url::Url>>,
|
||||
|
||||
/// Comma separated list of endpoints of the validator
|
||||
#[clap(
|
||||
#[arg(
|
||||
long,
|
||||
alias = "validators",
|
||||
alias = "nyxd_validators",
|
||||
@@ -65,47 +65,52 @@ pub struct Init {
|
||||
nyxd_urls: Option<Vec<url::Url>>,
|
||||
|
||||
/// Cosmos wallet mnemonic needed for double spending protection
|
||||
#[clap(long)]
|
||||
#[arg(long)]
|
||||
mnemonic: Option<bip39::Mnemonic>,
|
||||
|
||||
/// Set this gateway to work only with coconut credentials; that would disallow clients to
|
||||
/// bypass bandwidth credential requirement
|
||||
#[clap(long, hide = true)]
|
||||
#[arg(long, hide = true)]
|
||||
only_coconut_credentials: Option<bool>,
|
||||
|
||||
/// Enable/disable gateway anonymized statistics that get sent to a statistics aggregator server
|
||||
#[clap(long)]
|
||||
#[arg(long)]
|
||||
enabled_statistics: Option<bool>,
|
||||
|
||||
/// URL where a statistics aggregator is running. The default value is a Nym aggregator server
|
||||
#[clap(long)]
|
||||
#[arg(long)]
|
||||
statistics_service_url: Option<url::Url>,
|
||||
|
||||
/// Specifies whether this node should accepts and send out packets that would only go to nodes
|
||||
/// on the next mix layer
|
||||
#[arg(long)]
|
||||
enforce_forward_travel: bool,
|
||||
|
||||
/// Allows this gateway to run an embedded network requester for minimal network overhead
|
||||
#[clap(long, conflicts_with = "with_ip_packet_router")]
|
||||
#[arg(long, conflicts_with = "with_ip_packet_router")]
|
||||
with_network_requester: bool,
|
||||
|
||||
/// Allows this gateway to run an embedded network requester for minimal network overhead
|
||||
#[clap(long, hide = true, conflicts_with = "with_network_requester")]
|
||||
#[arg(long, hide = true, conflicts_with = "with_network_requester")]
|
||||
with_ip_packet_router: bool,
|
||||
|
||||
// ##### NETWORK REQUESTER FLAGS #####
|
||||
/// Specifies whether this network requester should run in 'open-proxy' mode
|
||||
#[clap(long, requires = "with_network_requester")]
|
||||
#[arg(long, requires = "with_network_requester")]
|
||||
open_proxy: Option<bool>,
|
||||
|
||||
/// Enable service anonymized statistics that get sent to a statistics aggregator server
|
||||
#[clap(long, requires = "with_network_requester")]
|
||||
#[arg(long, requires = "with_network_requester")]
|
||||
enable_statistics: Option<bool>,
|
||||
|
||||
/// Mixnet client address where a statistics aggregator is running. The default value is a Nym
|
||||
/// aggregator client
|
||||
#[clap(long, requires = "with_network_requester")]
|
||||
#[arg(long, requires = "with_network_requester")]
|
||||
statistics_recipient: Option<String>,
|
||||
|
||||
/// Mostly debug-related option to increase default traffic rate so that you would not need to
|
||||
/// modify config post init
|
||||
#[clap(
|
||||
#[arg(
|
||||
long,
|
||||
hide = true,
|
||||
conflicts_with = "medium_toggle",
|
||||
@@ -114,7 +119,7 @@ pub struct Init {
|
||||
fastmode: bool,
|
||||
|
||||
/// Disable loop cover traffic and the Poisson rate limiter (for debugging only)
|
||||
#[clap(
|
||||
#[arg(
|
||||
long,
|
||||
hide = true,
|
||||
conflicts_with = "medium_toggle",
|
||||
@@ -124,7 +129,7 @@ pub struct Init {
|
||||
|
||||
/// Enable medium mixnet traffic, for experiments only.
|
||||
/// This includes things like disabling cover traffic, no per hop delays, etc.
|
||||
#[clap(
|
||||
#[arg(
|
||||
long,
|
||||
hide = true,
|
||||
conflicts_with = "no_cover",
|
||||
@@ -136,10 +141,10 @@ pub struct Init {
|
||||
/// Specifies whether this network requester will run using the default ExitPolicy
|
||||
/// as opposed to the allow list.
|
||||
/// Note: this setting will become the default in the future releases.
|
||||
#[clap(long)]
|
||||
#[arg(long)]
|
||||
with_exit_policy: Option<bool>,
|
||||
|
||||
#[clap(short, long, default_value_t = OutputFormat::default())]
|
||||
#[arg(short, long, default_value_t = OutputFormat::default())]
|
||||
output: OutputFormat,
|
||||
}
|
||||
|
||||
@@ -154,6 +159,7 @@ impl From<Init> for OverrideConfig {
|
||||
datastore: init_config.datastore,
|
||||
nym_apis: init_config.nym_apis,
|
||||
mnemonic: init_config.mnemonic,
|
||||
enforce_forward_travel: Some(init_config.enforce_forward_travel),
|
||||
|
||||
enabled_statistics: init_config.enabled_statistics,
|
||||
statistics_service_url: init_config.statistics_service_url,
|
||||
@@ -302,6 +308,7 @@ mod tests {
|
||||
no_cover: false,
|
||||
medium_toggle: false,
|
||||
with_exit_policy: None,
|
||||
enforce_forward_travel: false,
|
||||
};
|
||||
std::env::set_var(BECH32_PREFIX, "n");
|
||||
|
||||
|
||||
@@ -108,6 +108,11 @@ pub struct Run {
|
||||
#[arg(long)]
|
||||
statistics_recipient: Option<String>,
|
||||
|
||||
/// Specifies whether this node should accepts and send out packets that would only go to nodes
|
||||
/// on the next mix layer
|
||||
#[arg(long)]
|
||||
enforce_forward_travel: Option<bool>,
|
||||
|
||||
/// Mostly debug-related option to increase default traffic rate so that you would not need to
|
||||
/// modify config post init
|
||||
#[arg(long, hide = true, conflicts_with = "medium_toggle")]
|
||||
@@ -157,6 +162,7 @@ impl From<Run> for OverrideConfig {
|
||||
datastore: run_config.datastore,
|
||||
nym_apis: run_config.nym_apis,
|
||||
mnemonic: run_config.mnemonic,
|
||||
enforce_forward_travel: run_config.enforce_forward_travel,
|
||||
|
||||
enabled_statistics: run_config.enabled_statistics,
|
||||
statistics_service_url: run_config.statistics_service_url,
|
||||
|
||||
@@ -188,11 +188,13 @@ impl Config {
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_enabled_network_requester(mut self, enabled_network_requester: bool) -> Self {
|
||||
self.network_requester.enabled = enabled_network_requester;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_default_network_requester_config_path(mut self) -> Self {
|
||||
self.storage_paths = self
|
||||
.storage_paths
|
||||
@@ -212,36 +214,43 @@ impl Config {
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_only_coconut_credentials(mut self, only_coconut_credentials: bool) -> Self {
|
||||
self.gateway.only_coconut_credentials = only_coconut_credentials;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_enabled_statistics(mut self, enabled_statistics: bool) -> Self {
|
||||
self.gateway.enabled_statistics = enabled_statistics;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_custom_statistics_service_url(mut self, statistics_service_url: Url) -> Self {
|
||||
self.gateway.statistics_service_url = statistics_service_url;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_custom_nym_apis(mut self, nym_api_urls: Vec<Url>) -> Self {
|
||||
self.gateway.nym_api_urls = nym_api_urls;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_custom_validator_nyxd(mut self, validator_nyxd_urls: Vec<Url>) -> Self {
|
||||
self.gateway.nyxd_urls = validator_nyxd_urls;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_cosmos_mnemonic(mut self, cosmos_mnemonic: bip39::Mnemonic) -> Self {
|
||||
self.gateway.cosmos_mnemonic = cosmos_mnemonic;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_listening_address(mut self, listening_address: IpAddr) -> Self {
|
||||
self.gateway.listening_address = listening_address;
|
||||
|
||||
@@ -253,16 +262,25 @@ impl Config {
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_mix_port(mut self, port: u16) -> Self {
|
||||
self.gateway.mix_port = port;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_clients_port(mut self, port: u16) -> Self {
|
||||
self.gateway.clients_port = port;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_enforce_forward_travel(mut self, forward_travel: bool) -> Self {
|
||||
self.debug.enforce_forward_travel = forward_travel;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_custom_persistent_store(mut self, store_dir: PathBuf) -> Self {
|
||||
self.storage_paths.clients_storage = store_dir;
|
||||
self
|
||||
@@ -328,7 +346,8 @@ pub struct Gateway {
|
||||
#[zeroize(skip)]
|
||||
pub nym_api_urls: Vec<Url>,
|
||||
|
||||
/// Addresses to validators which the node uses to check for double spending of ERC20 tokens.
|
||||
/// Addresses to nyxd validators via which the node can communicate with the chain directly,
|
||||
/// including for checking for double spending of coconut credentials.
|
||||
#[serde(alias = "validator_nymd_urls")]
|
||||
#[zeroize(skip)]
|
||||
pub nyxd_urls: Vec<Url>,
|
||||
@@ -421,6 +440,10 @@ pub struct Debug {
|
||||
/// Number of messages from offline client that can be pulled at once from the storage.
|
||||
pub message_retrieval_limit: i64,
|
||||
|
||||
/// Specifies whether this node should accepts and send out packets that would only go to nodes
|
||||
/// on the next mix layer.
|
||||
pub enforce_forward_travel: bool,
|
||||
|
||||
/// Specifies whether the mixnode should be using the legacy framing for the sphinx packets.
|
||||
// it's set to true by default. The reason for that decision is to preserve compatibility with the
|
||||
// existing nodes whilst everyone else is upgrading and getting the code for handling the new field.
|
||||
@@ -438,6 +461,9 @@ impl Default for Debug {
|
||||
maximum_connection_buffer_size: DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE,
|
||||
stored_messages_filename_length: DEFAULT_STORED_MESSAGE_FILENAME_LENGTH,
|
||||
message_retrieval_limit: DEFAULT_MESSAGE_RETRIEVAL_LIMIT,
|
||||
|
||||
// let's keep it disabled for now to not surprise operators/users
|
||||
enforce_forward_travel: false,
|
||||
use_legacy_framed_packet_version: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -163,6 +163,7 @@ impl From<ConfigV1_1_31> for Config {
|
||||
presence_sending_delay: value.debug.presence_sending_delay,
|
||||
stored_messages_filename_length: value.debug.stored_messages_filename_length,
|
||||
message_retrieval_limit: value.debug.message_retrieval_limit,
|
||||
enforce_forward_travel: false,
|
||||
use_legacy_framed_packet_version: value.debug.use_legacy_framed_packet_version,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -116,4 +116,10 @@ ip_packet_router_config = '{{ storage_paths.ip_packet_router_config }}'
|
||||
|
||||
# TODO
|
||||
|
||||
[debug]
|
||||
|
||||
# Specifies whether this node should accepts and send out packets that would only go to nodes
|
||||
# on the next mix layer.
|
||||
enforce_forward_travel = {{ debug.enforce_forward_travel }}
|
||||
|
||||
"#;
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
use crate::node::storage::error::StorageError;
|
||||
use nym_ip_packet_router::error::IpPacketRouterError;
|
||||
use nym_mixnode_common::forward_travel::error::ForwardTravelError;
|
||||
use nym_network_requester::error::{ClientCoreError, NetworkRequesterError};
|
||||
use nym_validator_client::nyxd::error::NyxdError;
|
||||
use nym_validator_client::nyxd::AccountId;
|
||||
@@ -131,6 +132,12 @@ pub(crate) enum GatewayError {
|
||||
source: NyxdError,
|
||||
},
|
||||
|
||||
#[error("failure in enforcing forward travel of mix packets: {source}")]
|
||||
ForwardTravel {
|
||||
#[from]
|
||||
source: ForwardTravelError,
|
||||
},
|
||||
|
||||
// TODO: in the future this should work the other way, i.e. NymNode depending on Gateway errors
|
||||
#[error(transparent)]
|
||||
NymNodeError(#[from] nym_node::error::NymNodeError),
|
||||
|
||||
@@ -15,12 +15,12 @@ use nym_sphinx::forwarding::packet::MixPacket;
|
||||
use nym_task::TaskClient;
|
||||
use nym_validator_client::coconut::CoconutApiError;
|
||||
use rand::{CryptoRng, Rng};
|
||||
use std::net::SocketAddr;
|
||||
use std::{convert::TryFrom, process, time::Duration};
|
||||
use thiserror::Error;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_tungstenite::tungstenite::{protocol::Message, Error as WsError};
|
||||
|
||||
use std::{convert::TryFrom, process, time::Duration};
|
||||
|
||||
use crate::node::{
|
||||
client_handling::{
|
||||
bandwidth::Bandwidth,
|
||||
@@ -40,13 +40,13 @@ pub(crate) enum RequestHandlingError {
|
||||
#[error("Internal gateway storage error")]
|
||||
StorageError(#[from] StorageError),
|
||||
|
||||
#[error("Provided bandwidth IV is malformed - {0}")]
|
||||
#[error("Provided bandwidth IV is malformed: {0}")]
|
||||
MalformedIV(#[from] IVConversionError),
|
||||
|
||||
#[error("Provided binary request was malformed - {0}")]
|
||||
#[error("Provided binary request was malformed: {0}")]
|
||||
InvalidBinaryRequest(#[from] GatewayRequestsError),
|
||||
|
||||
#[error("Provided binary request was malformed - {0}")]
|
||||
#[error("Provided binary request was malformed: {0}")]
|
||||
InvalidTextRequest(<ClientControlRequest as TryFrom<String>>::Error),
|
||||
|
||||
#[error("The received request is not valid in the current context")]
|
||||
@@ -61,10 +61,10 @@ pub(crate) enum RequestHandlingError {
|
||||
#[error("This gateway is only accepting coconut credentials for bandwidth")]
|
||||
OnlyCoconutCredentials,
|
||||
|
||||
#[error("Nyxd Error - {0}")]
|
||||
#[error("Nyxd Error: {0}")]
|
||||
NyxdError(#[from] nym_validator_client::nyxd::error::NyxdError),
|
||||
|
||||
#[error("Validator API error - {0}")]
|
||||
#[error("Validator API error: {0}")]
|
||||
APIError(#[from] nym_validator_client::ValidatorClientError),
|
||||
|
||||
#[error("Not enough nym API endpoints provided. Needed {needed}, received {received}")]
|
||||
@@ -73,14 +73,17 @@ pub(crate) enum RequestHandlingError {
|
||||
#[error("There was a problem with the proposal id: {reason}")]
|
||||
ProposalIdError { reason: String },
|
||||
|
||||
#[error("Coconut interface error - {0}")]
|
||||
#[error("Coconut interface error: {0}")]
|
||||
CoconutInterfaceError(#[from] nym_coconut_interface::error::CoconutInterfaceError),
|
||||
|
||||
#[error("coconut api query failure: {0}")]
|
||||
CoconutApiError(#[from] CoconutApiError),
|
||||
|
||||
#[error("Credential error - {0}")]
|
||||
#[error("Credential error: {0}")]
|
||||
CredentialError(#[from] nym_credentials::error::Error),
|
||||
|
||||
#[error("the outbound address of the received packet ('{address}') is forbidden as there are no nodes with that address on the next layer")]
|
||||
InvalidForwardHop { address: SocketAddr },
|
||||
}
|
||||
|
||||
impl RequestHandlingError {
|
||||
@@ -203,11 +206,25 @@ where
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `mix_packet`: packet received from the client that should get forwarded into the network.
|
||||
fn forward_packet(&self, mix_packet: MixPacket) {
|
||||
fn forward_packet(&self, mix_packet: MixPacket) -> Result<(), RequestHandlingError> {
|
||||
let next_hop: SocketAddr = mix_packet.next_hop().into();
|
||||
|
||||
// TODO: another option is to move this filter
|
||||
// (which is used by EVERY `ConnectionHandler`, so potentially hundreds of times)
|
||||
// to packet forwarder where we could be filtering at the time of attempting to open new outbound connections
|
||||
// However, in that case we wouldn't be able to return an error message to the client
|
||||
if !self.inner.allowed_egress.is_allowed(next_hop.ip()) {
|
||||
// TODO: perhaps this should get lowered in severity?
|
||||
warn!("received an packet that was meant to get forwarded to {next_hop}, but this address does not belong to any node on the next layer - dropping the packet");
|
||||
return Err(RequestHandlingError::InvalidForwardHop { address: next_hop });
|
||||
}
|
||||
|
||||
if let Err(err) = self.inner.outbound_mix_sender.unbounded_send(mix_packet) {
|
||||
error!("We failed to forward requested mix packet - {err}. Presumably our mix forwarder has crashed. We cannot continue.");
|
||||
process::exit(1);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Tries to handle the received bandwidth request by checking correctness of the received data
|
||||
@@ -317,7 +334,7 @@ where
|
||||
}
|
||||
|
||||
self.consume_bandwidth(consumed_bandwidth).await?;
|
||||
self.forward_packet(mix_packet);
|
||||
self.forward_packet(mix_packet)?;
|
||||
|
||||
Ok(ServerResponse::Send {
|
||||
remaining_bandwidth: available_bandwidth - consumed_bandwidth,
|
||||
|
||||
@@ -18,6 +18,7 @@ use nym_gateway_requests::{
|
||||
BinaryResponse, PROTOCOL_VERSION,
|
||||
};
|
||||
use nym_mixnet_client::forwarder::MixForwardingSender;
|
||||
use nym_mixnode_common::forward_travel::AllowedEgress;
|
||||
use nym_sphinx::DestinationAddressBytes;
|
||||
use rand::{CryptoRng, Rng};
|
||||
use std::{convert::TryFrom, sync::Arc, time::Duration};
|
||||
@@ -89,6 +90,7 @@ impl InitialAuthenticationError {
|
||||
pub(crate) struct FreshHandler<R, S, St> {
|
||||
rng: R,
|
||||
local_identity: Arc<identity::KeyPair>,
|
||||
pub(crate) allowed_egress: AllowedEgress,
|
||||
pub(crate) only_coconut_credentials: bool,
|
||||
pub(crate) active_clients_store: ActiveClientsStore,
|
||||
pub(crate) outbound_mix_sender: MixForwardingSender,
|
||||
@@ -110,6 +112,7 @@ where
|
||||
pub(crate) fn new(
|
||||
rng: R,
|
||||
conn: S,
|
||||
allowed_egress: AllowedEgress,
|
||||
only_coconut_credentials: bool,
|
||||
outbound_mix_sender: MixForwardingSender,
|
||||
local_identity: Arc<identity::KeyPair>,
|
||||
@@ -126,6 +129,7 @@ where
|
||||
local_identity,
|
||||
storage,
|
||||
coconut_verifier,
|
||||
allowed_egress,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ use crate::node::storage::Storage;
|
||||
use log::*;
|
||||
use nym_crypto::asymmetric::identity;
|
||||
use nym_mixnet_client::forwarder::MixForwardingSender;
|
||||
use nym_mixnode_common::forward_travel::AllowedEgress;
|
||||
use rand::rngs::OsRng;
|
||||
use std::net::SocketAddr;
|
||||
use std::process;
|
||||
@@ -16,6 +17,7 @@ use tokio::task::JoinHandle;
|
||||
|
||||
pub(crate) struct Listener {
|
||||
address: SocketAddr,
|
||||
allowed_egress: AllowedEgress,
|
||||
local_identity: Arc<identity::KeyPair>,
|
||||
only_coconut_credentials: bool,
|
||||
pub(crate) coconut_verifier: Arc<CoconutVerifier>,
|
||||
@@ -24,12 +26,14 @@ pub(crate) struct Listener {
|
||||
impl Listener {
|
||||
pub(crate) fn new(
|
||||
address: SocketAddr,
|
||||
allowed_egress: AllowedEgress,
|
||||
local_identity: Arc<identity::KeyPair>,
|
||||
only_coconut_credentials: bool,
|
||||
coconut_verifier: Arc<CoconutVerifier>,
|
||||
) -> Self {
|
||||
Listener {
|
||||
address,
|
||||
allowed_egress,
|
||||
local_identity,
|
||||
only_coconut_credentials,
|
||||
coconut_verifier,
|
||||
@@ -71,6 +75,7 @@ impl Listener {
|
||||
let handle = FreshHandler::new(
|
||||
OsRng,
|
||||
socket,
|
||||
self.allowed_egress.clone(),
|
||||
self.only_coconut_credentials,
|
||||
outbound_mix_sender.clone(),
|
||||
Arc::clone(&self.local_identity),
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
use crate::node::mixnet_handling::receiver::connection_handler::ConnectionHandler;
|
||||
use crate::node::storage::Storage;
|
||||
use log::*;
|
||||
use nym_mixnode_common::forward_travel::AllowedIngress;
|
||||
use nym_task::TaskClient;
|
||||
use std::net::SocketAddr;
|
||||
use std::process;
|
||||
@@ -11,13 +12,22 @@ use tokio::task::JoinHandle;
|
||||
|
||||
pub(crate) struct Listener {
|
||||
address: SocketAddr,
|
||||
allowed_ingress: AllowedIngress,
|
||||
shutdown: TaskClient,
|
||||
}
|
||||
|
||||
// TODO: this file is nearly identical to the one in mixnode
|
||||
impl Listener {
|
||||
pub(crate) fn new(address: SocketAddr, shutdown: TaskClient) -> Self {
|
||||
Listener { address, shutdown }
|
||||
pub(crate) fn new(
|
||||
address: SocketAddr,
|
||||
allowed_ingress: AllowedIngress,
|
||||
shutdown: TaskClient,
|
||||
) -> Self {
|
||||
Listener {
|
||||
address,
|
||||
allowed_ingress,
|
||||
shutdown,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn run<St>(&mut self, connection_handler: ConnectionHandler<St>)
|
||||
@@ -42,6 +52,12 @@ impl Listener {
|
||||
connection = tcp_listener.accept() => {
|
||||
match connection {
|
||||
Ok((socket, remote_addr)) => {
|
||||
if !self.allowed_ingress.is_allowed(remote_addr.ip()) {
|
||||
// TODO: perhaps this should get lowered in severity?
|
||||
warn!("received an incoming connection from {remote_addr}, but this address does not belong to any node on the previous layer - dropping the connection");
|
||||
continue
|
||||
}
|
||||
|
||||
let handler = connection_handler.clone();
|
||||
tokio::spawn(handler.handle_connection(socket, remote_addr, self.shutdown.clone().named(format!("MixnetConnectionHandler_{remote_addr}"))));
|
||||
}
|
||||
|
||||
+36
-1
@@ -28,6 +28,7 @@ use futures::channel::{mpsc, oneshot};
|
||||
use log::*;
|
||||
use nym_crypto::asymmetric::{encryption, identity};
|
||||
use nym_mixnet_client::forwarder::{MixForwardingSender, PacketForwarder};
|
||||
use nym_mixnode_common::forward_travel::{AllowedAddressesProvider, AllowedEgress, AllowedIngress};
|
||||
use nym_network_defaults::NymNetworkDetails;
|
||||
use nym_network_requester::{LocalGateway, NRServiceProviderBuilder, RequestFilter};
|
||||
use nym_node::wireguard::types::GatewayClientRegistry;
|
||||
@@ -178,6 +179,7 @@ impl<St> Gateway<St> {
|
||||
&self,
|
||||
ack_sender: MixForwardingSender,
|
||||
active_clients_store: ActiveClientsStore,
|
||||
ingress: AllowedIngress,
|
||||
shutdown: TaskClient,
|
||||
) where
|
||||
St: Storage + Clone + 'static,
|
||||
@@ -199,7 +201,8 @@ impl<St> Gateway<St> {
|
||||
self.config.gateway.mix_port,
|
||||
);
|
||||
|
||||
mixnet_handling::Listener::new(listening_address, shutdown).start(connection_handler);
|
||||
mixnet_handling::Listener::new(listening_address, ingress, shutdown)
|
||||
.start(connection_handler);
|
||||
}
|
||||
|
||||
#[cfg(feature = "wireguard")]
|
||||
@@ -212,6 +215,7 @@ impl<St> Gateway<St> {
|
||||
|
||||
fn start_client_websocket_listener(
|
||||
&self,
|
||||
allowed_egress: AllowedEgress,
|
||||
forwarding_channel: MixForwardingSender,
|
||||
active_clients_store: ActiveClientsStore,
|
||||
shutdown: TaskClient,
|
||||
@@ -228,6 +232,7 @@ impl<St> Gateway<St> {
|
||||
|
||||
websocket::Listener::new(
|
||||
listening_address,
|
||||
allowed_egress,
|
||||
Arc::clone(&self.identity_keypair),
|
||||
self.config.gateway.only_coconut_credentials,
|
||||
coconut_verifier,
|
||||
@@ -240,6 +245,28 @@ impl<St> Gateway<St> {
|
||||
);
|
||||
}
|
||||
|
||||
async fn start_allowed_addresses_provider(
|
||||
&self,
|
||||
task_client: TaskClient,
|
||||
) -> Result<(AllowedIngress, AllowedEgress), GatewayError> {
|
||||
let identity = self.identity_keypair.public_key().to_base58_string();
|
||||
let nyxd_endpoints = self.config.gateway.nyxd_urls.clone();
|
||||
|
||||
let network = NymNetworkDetails::new_from_env();
|
||||
let mut provider = AllowedAddressesProvider::new(
|
||||
identity,
|
||||
nyxd_endpoints,
|
||||
!self.config.debug.enforce_forward_travel,
|
||||
Some(network),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let filters = (provider.ingress(), provider.egress());
|
||||
|
||||
tokio::spawn(async move { provider.run(task_client).await });
|
||||
Ok(filters)
|
||||
}
|
||||
|
||||
fn start_packet_forwarder(&self, shutdown: TaskClient) -> MixForwardingSender {
|
||||
info!("Starting mix packet forwarder...");
|
||||
|
||||
@@ -449,6 +476,12 @@ impl<St> Gateway<St> {
|
||||
|
||||
let shutdown = TaskManager::new(10);
|
||||
|
||||
let (ingress, egress) = self
|
||||
.start_allowed_addresses_provider(
|
||||
shutdown.subscribe().named("AllowedAddressesProvider"),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let coconut_verifier = {
|
||||
let nyxd_client = self.random_nyxd_client()?;
|
||||
CoconutVerifier::new(nyxd_client)
|
||||
@@ -461,6 +494,7 @@ impl<St> Gateway<St> {
|
||||
self.start_mix_socket_listener(
|
||||
mix_forwarding_channel.clone(),
|
||||
active_clients_store.clone(),
|
||||
ingress,
|
||||
shutdown.subscribe().named("mixnet_handling::Listener"),
|
||||
);
|
||||
|
||||
@@ -478,6 +512,7 @@ impl<St> Gateway<St> {
|
||||
}
|
||||
|
||||
self.start_client_websocket_listener(
|
||||
egress,
|
||||
mix_forwarding_channel.clone(),
|
||||
active_clients_store.clone(),
|
||||
shutdown.subscribe().named("websocket::Listener"),
|
||||
|
||||
+4
-5
@@ -18,7 +18,7 @@ rust-version = "1.58.1"
|
||||
|
||||
[dependencies]
|
||||
axum = { workspace = true }
|
||||
anyhow = "1.0.40"
|
||||
anyhow = { workspace = true }
|
||||
bs58 = "0.4.0"
|
||||
clap = { workspace = true, features = ["cargo", "derive"] }
|
||||
colored = "2.0"
|
||||
@@ -26,15 +26,14 @@ cupid = "0.6.1"
|
||||
dirs = "4.0"
|
||||
futures = { workspace = true }
|
||||
humantime-serde = "1.0"
|
||||
lazy_static = "1.4.0"
|
||||
lazy_static = { workspace = true }
|
||||
log = { workspace = true }
|
||||
pretty_env_logger = "0.4.0"
|
||||
rand = "0.7.3"
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
sysinfo = "0.27.7"
|
||||
tokio = { version = "1.21.2", features = ["rt-multi-thread", "net", "signal"] }
|
||||
tokio-util = { version = "0.7.3", features = ["codec"] }
|
||||
tokio = { workspace = true, features = ["rt-multi-thread", "net", "signal"] }
|
||||
tokio-util = { workspace = true, features = ["codec"] }
|
||||
toml = "0.5.8"
|
||||
url = { workspace = true, features = ["serde"] }
|
||||
cfg-if = "1.0.0"
|
||||
|
||||
@@ -16,31 +16,40 @@ use std::{fs, io};
|
||||
#[derive(Args, Clone)]
|
||||
pub(crate) struct Init {
|
||||
/// Id of the mixnode we want to create config for
|
||||
#[clap(long)]
|
||||
#[arg(long)]
|
||||
id: String,
|
||||
|
||||
/// The host on which the mixnode will be running
|
||||
#[clap(long)]
|
||||
#[arg(long)]
|
||||
host: IpAddr,
|
||||
|
||||
/// The port on which the mixnode will be listening for mix packets
|
||||
#[clap(long)]
|
||||
#[arg(long)]
|
||||
mix_port: Option<u16>,
|
||||
|
||||
/// The port on which the mixnode will be listening for verloc packets
|
||||
#[clap(long)]
|
||||
#[arg(long)]
|
||||
verloc_port: Option<u16>,
|
||||
|
||||
/// The port on which the mixnode will be listening for http requests
|
||||
#[clap(long)]
|
||||
#[arg(long)]
|
||||
http_api_port: Option<u16>,
|
||||
|
||||
/// Comma separated list of nym-api endpoints of the validators
|
||||
// the alias here is included for backwards compatibility (1.1.4 and before)
|
||||
#[clap(long, alias = "validators", value_delimiter = ',')]
|
||||
#[arg(long, alias = "validators", value_delimiter = ',')]
|
||||
nym_apis: Option<Vec<url::Url>>,
|
||||
|
||||
#[clap(short, long, default_value_t = OutputFormat::default())]
|
||||
/// Comma separated list of endpoints of the nyxd validators
|
||||
#[arg(long, value_delimiter = ',')]
|
||||
nyxd_urls: Option<Vec<url::Url>>,
|
||||
|
||||
/// Specifies whether this node should accepts and send out packets that would only go to nodes
|
||||
/// on the next mix layer
|
||||
#[arg(long)]
|
||||
enforce_forward_travel: bool,
|
||||
|
||||
#[arg(short, long, default_value_t = OutputFormat::default())]
|
||||
output: OutputFormat,
|
||||
}
|
||||
|
||||
@@ -52,7 +61,9 @@ impl From<Init> for OverrideConfig {
|
||||
mix_port: init_config.mix_port,
|
||||
verloc_port: init_config.verloc_port,
|
||||
http_api_port: init_config.http_api_port,
|
||||
enforce_forward_travel: Some(init_config.enforce_forward_travel),
|
||||
nym_apis: init_config.nym_apis,
|
||||
nyxd_urls: init_config.nyxd_urls,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ use colored::Colorize;
|
||||
use log::{error, info, warn};
|
||||
use nym_bin_common::completions::{fig_generate, ArgShell};
|
||||
use nym_bin_common::version_checker;
|
||||
use nym_config::defaults::var_names::{BECH32_PREFIX, NYM_API};
|
||||
use nym_config::defaults::var_names;
|
||||
use nym_config::OptionalSet;
|
||||
use nym_crypto::bech32_address_validation;
|
||||
use std::net::IpAddr;
|
||||
@@ -58,7 +58,9 @@ struct OverrideConfig {
|
||||
mix_port: Option<u16>,
|
||||
verloc_port: Option<u16>,
|
||||
http_api_port: Option<u16>,
|
||||
enforce_forward_travel: Option<bool>,
|
||||
nym_apis: Option<Vec<url::Url>>,
|
||||
nyxd_urls: Option<Vec<url::Url>>,
|
||||
}
|
||||
|
||||
pub(crate) async fn execute(args: Cli) -> anyhow::Result<()> {
|
||||
@@ -83,22 +85,32 @@ fn override_config(config: Config, args: OverrideConfig) -> Config {
|
||||
.with_optional(Config::with_mix_port, args.mix_port)
|
||||
.with_optional(Config::with_verloc_port, args.verloc_port)
|
||||
.with_optional(Config::with_http_api_port, args.http_api_port)
|
||||
.with_optional(
|
||||
Config::with_enforce_forward_travel,
|
||||
args.enforce_forward_travel,
|
||||
)
|
||||
.with_optional_custom_env(
|
||||
Config::with_custom_nym_apis,
|
||||
args.nym_apis,
|
||||
NYM_API,
|
||||
var_names::NYM_API,
|
||||
nym_config::parse_urls,
|
||||
)
|
||||
.with_optional_custom_env(
|
||||
Config::with_custom_nyxd,
|
||||
args.nyxd_urls,
|
||||
var_names::NYXD,
|
||||
nym_config::parse_urls,
|
||||
)
|
||||
}
|
||||
|
||||
/// Ensures that a given bech32 address is valid, or exits
|
||||
pub(crate) fn validate_bech32_address_or_exit(address: &str) {
|
||||
let prefix = std::env::var(BECH32_PREFIX).expect("bech32 prefix not set");
|
||||
let prefix = std::env::var(var_names::BECH32_PREFIX).expect("bech32 prefix not set");
|
||||
if let Err(bech32_address_validation::Bech32Error::DecodeFailed(err)) =
|
||||
bech32_address_validation::try_bech32_decode(address)
|
||||
{
|
||||
let error_message = format!("Error: wallet address decoding failed: {err}").red();
|
||||
error!("{}", error_message);
|
||||
error!("{error_message}");
|
||||
error!("Exiting...");
|
||||
process::exit(1);
|
||||
}
|
||||
@@ -107,7 +119,7 @@ pub(crate) fn validate_bech32_address_or_exit(address: &str) {
|
||||
bech32_address_validation::validate_bech32_prefix(&prefix, address)
|
||||
{
|
||||
let error_message = format!("Error: wallet address type is wrong, {err}").red();
|
||||
error!("{}", error_message);
|
||||
error!("{error_message}");
|
||||
error!("Exiting...");
|
||||
process::exit(1);
|
||||
}
|
||||
|
||||
@@ -14,35 +14,44 @@ use std::net::IpAddr;
|
||||
#[derive(Args, Clone)]
|
||||
pub(crate) struct Run {
|
||||
/// Id of the nym-mixnode we want to run
|
||||
#[clap(long)]
|
||||
#[arg(long)]
|
||||
id: String,
|
||||
|
||||
/// The custom host on which the mixnode will be running
|
||||
#[clap(long)]
|
||||
#[arg(long)]
|
||||
host: Option<IpAddr>,
|
||||
|
||||
/// The wallet address you will use to bond this mixnode, e.g. nymt1z9egw0knv47nmur0p8vk4rcx59h9gg4zuxrrr9
|
||||
#[clap(long)]
|
||||
#[arg(long)]
|
||||
wallet_address: Option<nyxd::AccountId>,
|
||||
|
||||
/// The port on which the mixnode will be listening for mix packets
|
||||
#[clap(long)]
|
||||
#[arg(long)]
|
||||
mix_port: Option<u16>,
|
||||
|
||||
/// The port on which the mixnode will be listening for verloc packets
|
||||
#[clap(long)]
|
||||
#[arg(long)]
|
||||
verloc_port: Option<u16>,
|
||||
|
||||
/// The port on which the mixnode will be listening for http requests
|
||||
#[clap(long)]
|
||||
#[arg(long)]
|
||||
http_api_port: Option<u16>,
|
||||
|
||||
/// Comma separated list of nym-api endpoints of the validators
|
||||
// the alias here is included for backwards compatibility (1.1.4 and before)
|
||||
#[clap(long, alias = "validators", value_delimiter = ',')]
|
||||
#[arg(long, alias = "validators", value_delimiter = ',')]
|
||||
nym_apis: Option<Vec<url::Url>>,
|
||||
|
||||
#[clap(short, long, default_value_t = OutputFormat::default())]
|
||||
/// Comma separated list of endpoints of the nyxd validators
|
||||
#[arg(long, value_delimiter = ',')]
|
||||
nyxd_urls: Option<Vec<url::Url>>,
|
||||
|
||||
/// Specifies whether this node should accepts and send out packets that would only go to nodes
|
||||
/// on the next mix layer
|
||||
#[arg(long)]
|
||||
enforce_forward_travel: Option<bool>,
|
||||
|
||||
#[arg(short, long, default_value_t = OutputFormat::default())]
|
||||
output: OutputFormat,
|
||||
}
|
||||
|
||||
@@ -54,7 +63,9 @@ impl From<Run> for OverrideConfig {
|
||||
mix_port: run_config.mix_port,
|
||||
verloc_port: run_config.verloc_port,
|
||||
http_api_port: run_config.http_api_port,
|
||||
enforce_forward_travel: run_config.enforce_forward_travel,
|
||||
nym_apis: run_config.nym_apis,
|
||||
nyxd_urls: run_config.nyxd_urls,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -175,11 +175,19 @@ impl Config {
|
||||
}
|
||||
|
||||
// builder methods
|
||||
#[must_use]
|
||||
pub fn with_custom_nym_apis(mut self, nym_api_urls: Vec<Url>) -> Self {
|
||||
self.mixnode.nym_api_urls = nym_api_urls;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_custom_nyxd(mut self, nyxd_urls: Vec<Url>) -> Self {
|
||||
self.mixnode.nyxd_urls = nyxd_urls;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_listening_address(mut self, listening_address: IpAddr) -> Self {
|
||||
self.mixnode.listening_address = listening_address;
|
||||
|
||||
@@ -189,22 +197,31 @@ impl Config {
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_mix_port(mut self, port: u16) -> Self {
|
||||
self.mixnode.mix_port = port;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_verloc_port(mut self, port: u16) -> Self {
|
||||
self.mixnode.verloc_port = port;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_http_api_port(mut self, port: u16) -> Self {
|
||||
let http_ip = self.http.bind_address.ip();
|
||||
self.http.bind_address = SocketAddr::new(http_ip, port);
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_enforce_forward_travel(mut self, forward_travel: bool) -> Self {
|
||||
self.debug.enforce_forward_travel = forward_travel;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn get_nym_api_endpoints(&self) -> Vec<Url> {
|
||||
self.mixnode.nym_api_urls.clone()
|
||||
}
|
||||
@@ -231,6 +248,9 @@ pub struct MixNode {
|
||||
|
||||
/// Addresses to nym APIs from which the node gets the view of the network.
|
||||
pub nym_api_urls: Vec<Url>,
|
||||
|
||||
/// Addresses to nyxd validators via which the node can communicate with the chain directly.
|
||||
pub nyxd_urls: Vec<Url>,
|
||||
}
|
||||
|
||||
impl MixNode {
|
||||
@@ -242,6 +262,7 @@ impl MixNode {
|
||||
mix_port: DEFAULT_MIX_LISTENING_PORT,
|
||||
verloc_port: DEFAULT_VERLOC_LISTENING_PORT,
|
||||
nym_api_urls: vec![Url::from_str(mainnet::NYM_API).expect("Invalid default API URL")],
|
||||
nyxd_urls: vec![Url::from_str(mainnet::NYXD_URL).expect("Invalid default nyxd URL")],
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -319,6 +340,10 @@ pub struct Debug {
|
||||
/// Maximum number of packets that can be stored waiting to get sent to a particular connection.
|
||||
pub maximum_connection_buffer_size: usize,
|
||||
|
||||
/// Specifies whether this node should accepts and send out packets that would only go to nodes
|
||||
/// on the next mix layer.
|
||||
pub enforce_forward_travel: bool,
|
||||
|
||||
/// Specifies whether the mixnode should be using the legacy framing for the sphinx packets.
|
||||
// it's set to true by default. The reason for that decision is to preserve compatibility with the
|
||||
// existing nodes whilst everyone else is upgrading and getting the code for handling the new field.
|
||||
@@ -335,6 +360,9 @@ impl Default for Debug {
|
||||
packet_forwarding_maximum_backoff: DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF,
|
||||
initial_connection_timeout: DEFAULT_INITIAL_CONNECTION_TIMEOUT,
|
||||
maximum_connection_buffer_size: DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE,
|
||||
|
||||
// let's keep it disabled for now to not surprise operators/users
|
||||
enforce_forward_travel: false,
|
||||
use_legacy_framed_packet_version: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -75,6 +75,8 @@ impl ConfigV1_1_32 {
|
||||
|
||||
impl From<ConfigV1_1_32> for Config {
|
||||
fn from(value: ConfigV1_1_32) -> Self {
|
||||
let network = nym_config::defaults::NymNetworkDetails::new_from_env();
|
||||
|
||||
Config {
|
||||
// \/ ADDED
|
||||
save_path: None,
|
||||
@@ -104,6 +106,18 @@ impl From<ConfigV1_1_32> for Config {
|
||||
mix_port: value.mixnode.mix_port,
|
||||
verloc_port: value.mixnode.verloc_port,
|
||||
nym_api_urls: value.mixnode.nym_api_urls,
|
||||
|
||||
// \/ ADDED
|
||||
nyxd_urls: network
|
||||
.endpoints
|
||||
.into_iter()
|
||||
.map(|e| {
|
||||
e.nyxd_url
|
||||
.parse()
|
||||
.expect("malformed nyxd url in environment")
|
||||
})
|
||||
.collect(),
|
||||
// /\ ADDED
|
||||
},
|
||||
storage_paths: value.storage_paths,
|
||||
verloc: value.verloc.into(),
|
||||
@@ -243,6 +257,7 @@ impl From<DebugV1_1_32> for Debug {
|
||||
packet_forwarding_maximum_backoff: value.packet_forwarding_maximum_backoff,
|
||||
initial_connection_timeout: value.initial_connection_timeout,
|
||||
maximum_connection_buffer_size: value.maximum_connection_buffer_size,
|
||||
enforce_forward_travel: false,
|
||||
use_legacy_framed_packet_version: value.use_legacy_framed_packet_version,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,6 +49,13 @@ nym_api_urls = [
|
||||
{{/each}}
|
||||
]
|
||||
|
||||
# Addresses to nyxd validators via which the node can communicate with the chain directly.
|
||||
nyxd_urls = [
|
||||
{{#each mixnode.nyxd_urls }}
|
||||
'{{this}}',
|
||||
{{/each}}
|
||||
]
|
||||
|
||||
[http]
|
||||
# Socket address this node will use for binding its http API.
|
||||
# default: `0.0.0.0:8000`
|
||||
@@ -80,4 +87,10 @@ node_description = '{{ storage_paths.node_description }}'
|
||||
|
||||
# TODO
|
||||
|
||||
[debug]
|
||||
|
||||
# Specifies whether this node should accepts and send out packets that would only go to nodes
|
||||
# on the next mix layer.
|
||||
enforce_forward_travel = {{ debug.enforce_forward_travel }}
|
||||
|
||||
"#;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use nym_mixnode_common::forward_travel::error::ForwardTravelError;
|
||||
use std::io;
|
||||
use std::path::PathBuf;
|
||||
use thiserror::Error;
|
||||
@@ -44,7 +45,24 @@ pub enum MixnodeError {
|
||||
source: io::Error,
|
||||
},
|
||||
|
||||
#[error("experienced an error during shutdown: {message}")]
|
||||
ShutdownFailure { message: String },
|
||||
|
||||
#[error("failure in enforcing forward travel of mix packets: {source}")]
|
||||
ForwardTravel {
|
||||
#[from]
|
||||
source: ForwardTravelError,
|
||||
},
|
||||
|
||||
// TODO: in the future this should work the other way, i.e. NymNode depending on Gateway errors
|
||||
#[error(transparent)]
|
||||
NymNodeError(#[from] nym_node::error::NymNodeError),
|
||||
}
|
||||
|
||||
impl MixnodeError {
|
||||
pub(crate) fn shutdown_failure(err: Box<dyn std::error::Error + Send + Sync>) -> Self {
|
||||
MixnodeError::ShutdownFailure {
|
||||
message: err.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,8 +7,8 @@ use crate::node::listener::connection_handler::packet_processing::{
|
||||
use crate::node::packet_delayforwarder::PacketDelayForwardSender;
|
||||
use crate::node::TaskClient;
|
||||
use futures::StreamExt;
|
||||
use log::debug;
|
||||
use log::{error, info, warn};
|
||||
use log::{debug, error, info, warn};
|
||||
use nym_mixnode_common::forward_travel::AllowedEgress;
|
||||
use nym_mixnode_common::measure;
|
||||
use nym_sphinx::forwarding::packet::MixPacket;
|
||||
use nym_sphinx::framing::codec::NymCodec;
|
||||
@@ -26,22 +26,37 @@ pub(crate) mod packet_processing;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ConnectionHandler {
|
||||
allowed_egress: AllowedEgress,
|
||||
packet_processor: PacketProcessor,
|
||||
delay_forwarding_channel: PacketDelayForwardSender,
|
||||
}
|
||||
|
||||
impl ConnectionHandler {
|
||||
pub(crate) fn new(
|
||||
allowed_egress: AllowedEgress,
|
||||
packet_processor: PacketProcessor,
|
||||
delay_forwarding_channel: PacketDelayForwardSender,
|
||||
) -> Self {
|
||||
ConnectionHandler {
|
||||
allowed_egress,
|
||||
packet_processor,
|
||||
delay_forwarding_channel,
|
||||
}
|
||||
}
|
||||
|
||||
fn delay_and_forward_packet(&self, mix_packet: MixPacket, delay: Option<SphinxDelay>) {
|
||||
let next_hop: SocketAddr = mix_packet.next_hop().into();
|
||||
|
||||
// TODO: another option is to move this filter
|
||||
// (which is used by EVERY `ConnectionHandler`, so potentially hundreds of times)
|
||||
// to the mixnet client where we could be filtering at the time of attempting to open new outbound connections
|
||||
// However, in that case we'd have gone through the troubles of possibly unnecessarily delaying the packet
|
||||
if !self.allowed_egress.is_allowed(next_hop.ip()) {
|
||||
// TODO: perhaps this should get lowered in severity?
|
||||
warn!("received an packet that was meant to get forwarded to {next_hop}, but this address does not belong to any node on the next layer - dropping the packet");
|
||||
return;
|
||||
}
|
||||
|
||||
// determine instant at which packet should get forwarded. this way we minimise effect of
|
||||
// being stuck in the queue [of the channel] to get inserted into the delay queue
|
||||
let forward_instant = delay.map(|delay| Instant::now() + delay.to_duration());
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
use crate::node::listener::connection_handler::ConnectionHandler;
|
||||
use log::{error, info, warn};
|
||||
use nym_mixnode_common::forward_travel::AllowedIngress;
|
||||
use std::net::SocketAddr;
|
||||
use std::process;
|
||||
use tokio::net::TcpListener;
|
||||
@@ -14,12 +15,21 @@ pub(crate) mod connection_handler;
|
||||
|
||||
pub(crate) struct Listener {
|
||||
address: SocketAddr,
|
||||
allowed_ingress: AllowedIngress,
|
||||
shutdown: TaskClient,
|
||||
}
|
||||
|
||||
impl Listener {
|
||||
pub(crate) fn new(address: SocketAddr, shutdown: TaskClient) -> Self {
|
||||
Listener { address, shutdown }
|
||||
pub(crate) fn new(
|
||||
address: SocketAddr,
|
||||
allowed_ingress: AllowedIngress,
|
||||
shutdown: TaskClient,
|
||||
) -> Self {
|
||||
Listener {
|
||||
address,
|
||||
allowed_ingress,
|
||||
shutdown,
|
||||
}
|
||||
}
|
||||
|
||||
async fn run(&mut self, connection_handler: ConnectionHandler) {
|
||||
@@ -41,6 +51,12 @@ impl Listener {
|
||||
connection = listener.accept() => {
|
||||
match connection {
|
||||
Ok((socket, remote_addr)) => {
|
||||
if !self.allowed_ingress.is_allowed(remote_addr.ip()) {
|
||||
// TODO: perhaps this should get lowered in severity?
|
||||
warn!("received an incoming connection from {remote_addr}, but this address does not belong to any node on the previous layer - dropping the connection");
|
||||
continue
|
||||
}
|
||||
|
||||
let handler = connection_handler.clone();
|
||||
tokio::spawn(handler.handle_connection(socket, remote_addr, self.shutdown.clone()));
|
||||
}
|
||||
|
||||
+49
-8
@@ -15,7 +15,9 @@ use crate::node::packet_delayforwarder::{DelayForwarder, PacketDelayForwardSende
|
||||
use log::{error, info, warn};
|
||||
use nym_bin_common::output_format::OutputFormat;
|
||||
use nym_bin_common::version_checker::parse_version;
|
||||
use nym_config::defaults::NymNetworkDetails;
|
||||
use nym_crypto::asymmetric::{encryption, identity};
|
||||
use nym_mixnode_common::forward_travel::{AllowedAddressesProvider, AllowedEgress, AllowedIngress};
|
||||
use nym_mixnode_common::verloc::{self, AtomicVerlocResult, VerlocMeasurer};
|
||||
use nym_task::{TaskClient, TaskManager};
|
||||
use rand::seq::SliceRandom;
|
||||
@@ -101,6 +103,8 @@ impl MixNode {
|
||||
&self,
|
||||
node_stats_update_sender: node_statistics::UpdateSender,
|
||||
delay_forwarding_channel: PacketDelayForwardSender,
|
||||
ingress: AllowedIngress,
|
||||
egress: AllowedEgress,
|
||||
shutdown: TaskClient,
|
||||
) {
|
||||
info!("Starting socket listener...");
|
||||
@@ -108,14 +112,37 @@ impl MixNode {
|
||||
let packet_processor =
|
||||
PacketProcessor::new(self.sphinx_keypair.private_key(), node_stats_update_sender);
|
||||
|
||||
let connection_handler = ConnectionHandler::new(packet_processor, delay_forwarding_channel);
|
||||
let connection_handler =
|
||||
ConnectionHandler::new(egress, packet_processor, delay_forwarding_channel);
|
||||
|
||||
let listening_address = SocketAddr::new(
|
||||
self.config.mixnode.listening_address,
|
||||
self.config.mixnode.mix_port,
|
||||
);
|
||||
|
||||
Listener::new(listening_address, shutdown).start(connection_handler);
|
||||
Listener::new(listening_address, ingress, shutdown).start(connection_handler);
|
||||
}
|
||||
|
||||
async fn start_allowed_addresses_provider(
|
||||
&self,
|
||||
task_client: TaskClient,
|
||||
) -> Result<(AllowedIngress, AllowedEgress), MixnodeError> {
|
||||
let identity = self.identity_keypair.public_key().to_base58_string();
|
||||
let nyxd_endpoints = self.config.mixnode.nyxd_urls.clone();
|
||||
|
||||
let network = NymNetworkDetails::new_from_env();
|
||||
let mut provider = AllowedAddressesProvider::new(
|
||||
identity,
|
||||
nyxd_endpoints,
|
||||
!self.config.debug.enforce_forward_travel,
|
||||
Some(network),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let filters = (provider.ingress(), provider.egress());
|
||||
|
||||
tokio::spawn(async move { provider.run(task_client).await });
|
||||
Ok(filters)
|
||||
}
|
||||
|
||||
fn start_packet_delay_forwarder(
|
||||
@@ -215,12 +242,16 @@ impl MixNode {
|
||||
})
|
||||
}
|
||||
|
||||
async fn wait_for_interrupt(&self, shutdown: TaskManager) {
|
||||
let _res = shutdown.catch_interrupt().await;
|
||||
log::info!("Stopping nym mixnode");
|
||||
async fn wait_for_interrupt(
|
||||
&self,
|
||||
shutdown: TaskManager,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let res = shutdown.catch_interrupt().await;
|
||||
log::info!("Stopping nym gateway");
|
||||
res
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) -> Result<(), MixnodeError> {
|
||||
pub(crate) async fn run(&mut self) -> Result<(), MixnodeError> {
|
||||
info!("Starting nym mixnode");
|
||||
|
||||
if self.check_if_bonded().await {
|
||||
@@ -229,6 +260,12 @@ impl MixNode {
|
||||
|
||||
let shutdown = TaskManager::default();
|
||||
|
||||
let (ingress, egress) = self
|
||||
.start_allowed_addresses_provider(
|
||||
shutdown.subscribe().named("AllowedAddressesProvider"),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let (node_stats_pointer, node_stats_update_sender) = self
|
||||
.start_node_stats_controller(shutdown.subscribe().named("node_statistics::Controller"));
|
||||
let delay_forwarding_channel = self.start_packet_delay_forwarder(
|
||||
@@ -238,6 +275,8 @@ impl MixNode {
|
||||
self.start_socket_listener(
|
||||
node_stats_update_sender,
|
||||
delay_forwarding_channel,
|
||||
ingress,
|
||||
egress,
|
||||
shutdown.subscribe().named("Listener"),
|
||||
);
|
||||
let atomic_verloc_results =
|
||||
@@ -253,7 +292,9 @@ impl MixNode {
|
||||
)?;
|
||||
|
||||
info!("Finished nym mixnode startup procedure - it should now be able to receive mix traffic!");
|
||||
self.wait_for_interrupt(shutdown).await;
|
||||
Ok(())
|
||||
|
||||
self.wait_for_interrupt(shutdown)
|
||||
.await
|
||||
.map_err(MixnodeError::shutdown_failure)
|
||||
}
|
||||
}
|
||||
|
||||
+2
-2
@@ -38,13 +38,13 @@ serde_json = { workspace = true }
|
||||
tap = "1.0"
|
||||
thiserror = { workspace = true }
|
||||
time = { workspace = true, features = ["serde-human-readable", "parsing"] }
|
||||
tokio = { version = "1.24.1", features = [
|
||||
tokio = { workspace = true, features = [
|
||||
"rt-multi-thread",
|
||||
"macros",
|
||||
"signal",
|
||||
"time",
|
||||
] }
|
||||
tokio-stream = "0.1.11"
|
||||
tokio-stream = { workspace = true }
|
||||
url = { workspace = true }
|
||||
|
||||
ts-rs = { workspace = true, optional = true}
|
||||
|
||||
Generated
+4
-4
@@ -7448,9 +7448,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.31.0"
|
||||
version = "1.33.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "40de3a2ba249dcb097e01be5e67a5ff53cf250397715a071a81543e8a832a920"
|
||||
checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"bytes",
|
||||
@@ -7557,9 +7557,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tokio-util"
|
||||
version = "0.7.8"
|
||||
version = "0.7.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d"
|
||||
checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-core",
|
||||
|
||||
Generated
+2
-2
@@ -5797,9 +5797,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.31.0"
|
||||
version = "1.33.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "40de3a2ba249dcb097e01be5e67a5ff53cf250397715a071a81543e8a832a920"
|
||||
checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"bytes",
|
||||
|
||||
@@ -8,6 +8,7 @@ localnetdir="$HOME/.nym/localnets/localnet.$suffix"
|
||||
mkdir -p "$localnetdir"
|
||||
|
||||
echo "Using $localnetdir for the localnet"
|
||||
cargo build --release
|
||||
|
||||
# initialise mixnet
|
||||
echo "initialising mixnode1..."
|
||||
|
||||
@@ -48,8 +48,8 @@ nym-bin-common = { path = "../../../common/bin-common" }
|
||||
|
||||
# extra dependencies for libp2p examples
|
||||
libp2p = { git = "https://github.com/ChainSafe/rust-libp2p.git", rev = "e3440d25681df380c9f0f8cfdcfd5ecc0a4f2fb6", features = [ "identify", "macros", "ping", "tokio", "tcp", "dns", "websocket", "noise", "mplex", "yamux", "gossipsub" ]}
|
||||
tokio-stream = "0.1.12"
|
||||
tokio-util = { version = "0.7", features = ["codec"] }
|
||||
tokio-stream = { workspace = true }
|
||||
tokio-util = { workspace = true, features = ["codec"] }
|
||||
parking_lot = "0.12"
|
||||
hex = "0.4"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user