Compare commits

...

21 Commits

Author SHA1 Message Date
Jędrzej Stuczyński c21b015fa3 changed filtering behaviour to always use ALL bonded mixnodes and gateways 2023-12-07 08:55:43 +00:00
Jędrzej Stuczyński dfb7b707e7 post-rebase clippy 2023-12-07 08:54:38 +00:00
Jędrzej Stuczyński 66daddac96 fixed gateway test 2023-12-07 08:54:38 +00:00
Jędrzej Stuczyński 06e7b95800 build all binaries before starting localnet 2023-12-07 08:54:38 +00:00
Jędrzej Stuczyński 8883f64bd6 added mixnode clap argument to set nyxd urls 2023-12-07 08:54:37 +00:00
Jędrzej Stuczyński d9f88ca515 ibid. for the gateway 2023-12-07 08:54:37 +00:00
Jędrzej Stuczyński 667d8d34eb an option to set whether mixnode should enforce forward travel policy 2023-12-07 08:54:37 +00:00
Jędrzej Stuczyński 6b0a743f31 helper on TaskClient to sleep for specified amount of time 2023-12-07 08:54:37 +00:00
Jędrzej Stuczyński e70303fcaa fixed mixnode config template 2023-12-07 08:54:37 +00:00
Jędrzej Stuczyński 7659db973f clippy 2023-12-07 08:54:36 +00:00
Jędrzej Stuczyński 72480543cb [gateway] using the egress filter for outgoing packets 2023-12-07 08:54:35 +00:00
Jędrzej Stuczyński e67e6a9838 [gateway] using the ingress filter for incoming (mix) connections 2023-12-07 08:53:18 +00:00
Jędrzej Stuczyński d9cfec125a [mixnode] using the egress filter for outgoing packets 2023-12-07 08:53:18 +00:00
Jędrzej Stuczyński a72414e359 using parking_lot::RwLock instead of tokio::RwLock 2023-12-07 08:53:18 +00:00
Jędrzej Stuczyński 81548eba3f setting initial provider values at construction 2023-12-07 08:53:18 +00:00
Jędrzej Stuczyński 527fc5dfdd [mixnode] using the ingress filter for incoming connections 2023-12-07 08:53:18 +00:00
Jędrzej Stuczyński 4fabb7a44c creating correct filters based on whether node is a gateway or isnt bonded at all 2023-12-07 08:53:18 +00:00
Jędrzej Stuczyński b401123d59 starting the provider inside the mixnode 2023-12-07 08:53:17 +00:00
Jędrzej Stuczyński 8f83fd784f module responsible for updating paths allowed for forward travel 2023-12-07 08:53:17 +00:00
Jędrzej Stuczyński 867e92c9f5 'thiserror' implementation for verloc error 2023-12-07 08:53:17 +00:00
Jędrzej Stuczyński 65895db582 chore: update tokio-related dependencies 2023-12-07 08:53:15 +00:00
47 changed files with 873 additions and 177 deletions
Generated
+2 -2
View File
@@ -6521,7 +6521,6 @@ dependencies = [
"nym-validator-client",
"nym-wireguard",
"once_cell",
"pretty_env_logger",
"rand 0.7.3",
"rand 0.8.5",
"serde",
@@ -6723,7 +6722,6 @@ dependencies = [
"nym-types",
"nym-validator-client",
"opentelemetry",
"pretty_env_logger",
"rand 0.7.3",
"serde",
"serde_json",
@@ -6748,6 +6746,7 @@ dependencies = [
"log",
"nym-bin-common",
"nym-crypto",
"nym-mixnet-contract-common",
"nym-network-defaults",
"nym-sphinx-acknowledgements",
"nym-sphinx-addressing",
@@ -6757,6 +6756,7 @@ dependencies = [
"nym-sphinx-types",
"nym-task",
"nym-validator-client",
"parking_lot 0.12.1",
"rand 0.8.5",
"serde",
"thiserror",
+3 -1
View File
@@ -162,8 +162,10 @@ serde_json = "1.0.91"
tap = "1.0.1"
time = "0.3.30"
thiserror = "1.0.48"
tokio = "1.24.1"
tokio = "1.33.0"
tokio-stream = "0.1.14"
tokio-tungstenite = "0.20.1"
tokio-util = "0.7.9"
tracing = "0.1.37"
tungstenite = { version = "0.20.1", default-features = false }
ts-rs = "7.0.0"
+2 -2
View File
@@ -47,7 +47,7 @@ nym-credential-storage = { path = "../credential-storage" }
nym-network-defaults = { path = "../network-defaults" }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-stream]
version = "0.1.11"
workspace = true
features = ["time"]
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio]
@@ -55,7 +55,7 @@ workspace = true
features = ["time"]
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-tungstenite]
version = "0.20.1"
workspace = true
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.sqlx]
version = "0.6.2"
+1 -1
View File
@@ -40,7 +40,7 @@ workspace = true
features = ["macros", "rt", "net", "sync", "time"]
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-stream]
version = "0.1.11"
workspace = true
features = ["net", "sync", "time"]
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-tungstenite]
+1 -1
View File
@@ -10,7 +10,7 @@ edition = "2021"
futures = { workspace = true }
log = { workspace = true }
tokio = { version = "1.24.1", features = ["time", "net", "rt"] }
tokio-util = { version = "0.7.4", features = ["codec"] }
tokio-util = { workspace = true, features = ["codec"] }
# internal
nym-sphinx = { path = "../../nymsphinx" }
+6 -11
View File
@@ -123,13 +123,10 @@ impl Client {
// We could have as well used conn.send_all(receiver.map(Ok)), but considering we don't care
// about neither receiver nor the connection, it doesn't matter which one gets consumed
if let Err(err) = receiver.map(Ok).forward(conn).await {
warn!("Failed to forward packets to {} - {err}", address);
warn!("Failed to forward packets to {address} - {err}");
}
debug!(
"connection manager to {} is finished. Either the connection failed or mixnet client got dropped",
address
);
debug!("connection manager to {address} is finished. Either the connection failed or mixnet client got dropped");
}
/// If we're trying to reconnect, determine how long we should wait.
@@ -207,7 +204,7 @@ impl SendWithoutResponse for Client {
if let Some(sender) = self.conn_new.get_mut(&address) {
if let Err(err) = sender.channel.try_send(framed_packet) {
if err.is_full() {
debug!("Connection to {} seems to not be able to handle all the traffic - dropping the current packet", address);
debug!("Connection to {address} seems to not be able to handle all the traffic - dropping the current packet");
// it's not a 'big' error, but we did not manage to send the packet
// if the queue is full, we can't really do anything but to drop the packet
Err(io::Error::new(
@@ -215,10 +212,8 @@ impl SendWithoutResponse for Client {
"connection queue is full",
))
} else if err.is_disconnected() {
debug!(
"Connection to {} seems to be dead. attempting to re-establish it...",
address
);
debug!("Connection to {address} seems to be dead. attempting to re-establish it...");
// it's not a 'big' error, but we did not manage to send the packet, but queue
// it up to send it as soon as the connection is re-established
self.make_connection(address, err.into_inner());
@@ -238,7 +233,7 @@ impl SendWithoutResponse for Client {
}
} else {
// there was never a connection to begin with
debug!("establishing initial connection to {}", address);
debug!("establishing initial connection to {address}");
// it's not a 'big' error, but we did not manage to send the packet, but queue the packet
// for sending for as soon as the connection is created
self.make_connection(address, framed_packet);
@@ -55,6 +55,9 @@ pub use tendermint_rpc::{
};
pub use tendermint_rpc::{Request, Response, SimpleRequest};
#[cfg(feature = "http-client")]
pub use tendermint_rpc::Url as RpcUrl;
#[cfg(feature = "http-client")]
use crate::http_client;
#[cfg(feature = "http-client")]
@@ -623,6 +623,24 @@ pub enum Layer {
Three = 3,
}
impl Layer {
pub fn try_next(&self) -> Option<Self> {
match self {
Layer::One => Some(Layer::Two),
Layer::Two => Some(Layer::Three),
Layer::Three => None,
}
}
pub fn try_previous(&self) -> Option<Self> {
match self {
Layer::One => None,
Layer::Two => Some(Layer::One),
Layer::Three => Some(Layer::Two),
}
}
}
impl From<Layer> for String {
fn from(layer: Layer) -> Self {
(layer as u8).to_string()
+6 -4
View File
@@ -12,20 +12,21 @@ futures = { workspace = true }
humantime-serde = "1.0"
log = { workspace = true }
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.24.1", features = [
serde = { workspace = true, features = ["derive"] }
tokio = { workspace = true, features = [
"time",
"macros",
"rt",
"net",
"io-util",
] }
tokio-util = { version = "0.7.4", features = ["codec"] }
tokio-util = { workspace = true, features = ["codec"] }
url = { workspace = true }
thiserror = { workspace = true }
parking_lot = { workspace = true }
## tracing
tracing = { version = "0.1.37", optional = true }
tracing = { workspace = true, optional = true }
nym-crypto = { path = "../crypto" }
nym-network-defaults = { path = "../network-defaults" }
@@ -37,6 +38,7 @@ nym-sphinx-params = { path = "../nymsphinx/params" }
nym-sphinx-types = { path = "../nymsphinx/types" }
nym-task = { path = "../task" }
nym-validator-client = { path = "../client-libs/validator-client" }
nym-mixnet-contract-common = { path = "../cosmwasm-smart-contracts/mixnet-contract" }
nym-bin-common = { path = "../bin-common" }
cfg-if = "1.0.0"
@@ -0,0 +1,27 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_validator_client::nyxd::error::NyxdError;
use std::net::IpAddr;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum ForwardTravelError {
#[error("received a connection request from a forbidden address: '{address}'")]
DisallowedIngressAddress { address: IpAddr },
#[error("received a request to open connection to a forbidden address: '{address}'")]
DisallowedEgressAddress { address: IpAddr },
#[error("no valid nyxd urls are available for topology queries")]
NoNyxdUrlsAvailable,
#[error("nyxd interaction failure: {source}")]
NyxdFailure {
#[from]
source: NyxdError,
},
#[error("the current epoch appears to be stuck")]
StuckEpoch,
}
@@ -0,0 +1,356 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::forward_travel::error::ForwardTravelError;
use log::{debug, error, info, trace, warn};
use nym_mixnet_contract_common::{EpochId, GatewayBond, Layer, MixNodeBond};
use nym_network_defaults::NymNetworkDetails;
use nym_task::TaskClient;
use nym_validator_client::client::IdentityKey;
use nym_validator_client::nyxd::contract_traits::{MixnetQueryClient, PagedMixnetQueryClient};
use nym_validator_client::{nyxd, QueryHttpRpcNyxdClient};
use parking_lot::RwLock;
use rand::seq::SliceRandom;
use rand::{thread_rng, Rng};
use std::collections::HashSet;
use std::mem;
use std::net::{IpAddr, ToSocketAddrs};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use url::Url;
pub mod error;
// TODO: to allow for separate ingress/egress we have to change our layer selection algorithm, i.e. it has to be announced before epoch change
pub type AllowedIngress = AllowedPaths;
pub type AllowedEgress = AllowedPaths;
pub struct AllowedAddressesProvider {
current_epoch: EpochId,
identity: IdentityKey,
client_config: nyxd::Config,
/// URLs to the nyxd validators for obtaining unfiltered network topology.
nyxd_endpoints: Vec<Url>,
// to allow for separate ingress/egress we have to change our layer selection algorithm, i.e. it has to be announced before epoch change
// ingress: AllowedIngress,
// egress: AllowedEgress,
allowed: AllowedPaths,
}
#[allow(dead_code)]
impl AllowedAddressesProvider {
pub async fn new(
identity: IdentityKey,
nyxd_endpoints: Vec<Url>,
allow_all: bool,
network_details: Option<NymNetworkDetails>,
) -> Result<Self, ForwardTravelError> {
let network = network_details.unwrap_or(NymNetworkDetails::new_mainnet());
let mut provider = AllowedAddressesProvider {
current_epoch: 0,
identity,
client_config: nyxd::Config::try_from_nym_network_details(&network)?,
nyxd_endpoints,
allowed: AllowedPaths::new(allow_all),
};
if !allow_all {
// set initial values for ingress/egress
let client = provider.ephemeral_nyxd_client()?;
provider.update_state(client).await?;
}
Ok(provider)
}
fn ephemeral_nyxd_client(&self) -> Result<QueryHttpRpcNyxdClient, ForwardTravelError> {
let mut possible_nyxd_endpoints = self.nyxd_endpoints.clone();
possible_nyxd_endpoints.shuffle(&mut thread_rng());
let mut last_error = match QueryHttpRpcNyxdClient::connect(
self.client_config.clone(),
possible_nyxd_endpoints
.pop()
.ok_or(ForwardTravelError::NoNyxdUrlsAvailable)?
.as_str(),
) {
Ok(client) => return Ok(client),
Err(err) => err,
};
for url in possible_nyxd_endpoints {
match QueryHttpRpcNyxdClient::connect(self.client_config.clone(), url.as_str()) {
Ok(client) => return Ok(client),
Err(err) => last_error = err,
};
}
Err(last_error.into())
}
pub fn ingress(&self) -> AllowedIngress {
self.allowed.clone()
}
pub fn egress(&self) -> AllowedEgress {
self.allowed.clone()
}
fn add_node_ips(raw_host: &str, identity: &str, set: &mut HashSet<IpAddr>) {
if let Ok(ip) = IpAddr::from_str(raw_host) {
set.insert(ip);
} else {
// this might still be a valid hostname
//
// annoyingly there exists a method of looking up a socket address but not an ip address,
// so append any port and perform the lookup
let Ok(sockets) = format!("{raw_host}:1789").to_socket_addrs() else {
warn!("failed to resolve ip address of node '{identity}' (hostname: '{raw_host}')");
return;
};
for socket in sockets {
set.insert(socket.ip());
}
}
}
fn get_all_addresses(nodes: &[MixNodeBond], gateways: &[GatewayBond]) -> HashSet<IpAddr> {
let mut allowed = HashSet::new();
for node in nodes.iter() {
Self::add_node_ips(&node.mix_node.host, node.identity(), &mut allowed);
}
for gateway in gateways.iter() {
Self::add_node_ips(&gateway.gateway.host, gateway.identity(), &mut allowed);
}
allowed
}
/// Gets ip addresses of all mixnodes on given layer
fn get_addresses_on_layer(layer: Layer, nodes: &[MixNodeBond]) -> HashSet<IpAddr> {
let mut allowed = HashSet::new();
for node in nodes.iter().filter(|m| m.layer == layer) {
Self::add_node_ips(&node.mix_node.host, node.identity(), &mut allowed);
}
allowed
}
fn gateway_addresses(gateways: &[GatewayBond]) -> HashSet<IpAddr> {
let mut allowed = HashSet::new();
for gateway in gateways.iter() {
Self::add_node_ips(&gateway.gateway.host, gateway.identity(), &mut allowed);
}
allowed
}
fn locate_layer(&self, nodes: &[MixNodeBond]) -> Option<Layer> {
nodes
.iter()
.find(|m| m.identity() == self.identity)
.map(|m| m.layer)
}
fn is_gateway(&self, gateways: &[GatewayBond]) -> bool {
gateways
.iter()
.any(|g| g.gateway.identity_key == self.identity)
}
async fn update_state(
&mut self,
client: QueryHttpRpcNyxdClient,
) -> Result<(), ForwardTravelError> {
let current_interval = client.get_current_interval_details().await?;
let current_epoch = current_interval.interval.current_epoch_absolute_id();
if current_epoch == self.current_epoch {
error!("can't update the allowed ips list as the epoch appears to be stuck");
return Err(ForwardTravelError::StuckEpoch);
}
let has_epoch_deviated = current_epoch > self.current_epoch + 1;
let mixnodes = client.get_all_mixnode_bonds().await?;
let gateways = client.get_all_gateways().await?;
let new_allowed = Self::get_all_addresses(&mixnodes, &gateways);
// I'm leaving this code commented out to preserve this logic for when we need it
// to update to proper ingress/egress filtering
// let our_mix_layer = self.locate_layer(&mixnodes);
//
// let previous_mix_layer = our_mix_layer.and_then(|l| l.try_previous());
// let next_mix_layer = our_mix_layer.and_then(|l| l.try_next());
//
// let (allowed_ingress, allowed_egress) = match (previous_mix_layer, next_mix_layer) {
// // layer 1
// (None, Some(next)) => {
// let gateways = client.get_all_gateways().await?;
//
// (
// Self::gateway_addresses(&gateways),
// Self::get_addresses_on_layer(next, &mixnodes),
// )
// }
// // layer 2
// (Some(previous), Some(next)) => (
// Self::get_addresses_on_layer(previous, &mixnodes),
// Self::get_addresses_on_layer(next, &mixnodes),
// ),
// // layer 3
// (Some(previous), None) => {
// let gateways = client.get_all_gateways().await?;
// (
// Self::get_addresses_on_layer(previous, &mixnodes),
// Self::gateway_addresses(&gateways),
// )
// }
// // gateway (or not bonded)
// (None, None) => {
// let gateways = client.get_all_gateways().await?;
//
// if self.is_gateway(&gateways) {
// let mut base_ingress = Self::get_addresses_on_layer(Layer::Three, &mixnodes);
// let mut base_egress = Self::get_addresses_on_layer(Layer::One, &mixnodes);
//
// // TODO: this extension should be conditional on whether the node is running the vpn module
// let gw_extension = Self::gateway_addresses(&gateways);
//
// base_ingress.extend(gw_extension.clone());
// base_egress.extend(gw_extension.clone());
//
// (base_ingress, base_egress)
// } else {
// warn!("our node doesn't appear to be bonded - going to permit traffic from ALL mixnodes and gateways");
// let all = Self::get_all_addresses(&mixnodes, &gateways);
// (all.clone(), all)
// }
// }
// };
self.current_epoch = current_epoch;
self.allowed.advance_epoch(new_allowed, has_epoch_deviated);
Ok(())
}
async fn update_allowed_addresses(&mut self) -> Result<(), ForwardTravelError> {
// create new client every epoch because it results in different nyxd endpoint being used
// what may help in distributing the load
let client = self.ephemeral_nyxd_client()?;
self.wait_for_epoch_rollover(&client).await?;
self.update_state(client).await
}
async fn wait_for_epoch_rollover(
&self,
client: &QueryHttpRpcNyxdClient,
) -> Result<(), ForwardTravelError> {
let current_interval = client.get_current_interval_details().await?;
let current_epoch = current_interval.interval.current_epoch_absolute_id();
if current_epoch <= self.current_epoch {
let remaining = current_interval.time_until_current_epoch_end();
// add few more seconds to account for block time drift and to spread queries of all
// other nodes
let adjustment_secs = rand::thread_rng().gen_range(5..90);
sleep(remaining + Duration::from_secs(adjustment_secs)).await;
}
Ok(())
}
pub async fn run(&mut self, mut task_client: TaskClient) {
if self.allowed.allow_all {
// debug_assert!(self.egress.allow_all);
info!("the forward travel is currently disabled - there's no point in starting the route refresher");
task_client.mark_as_success();
return;
}
debug!("Started ValidAddressesProvider with graceful shutdown support");
while !task_client.is_shutdown() {
tokio::select! {
biased;
_ = task_client.recv() => {
trace!("ValidAddressesProvider: Received shutdown");
}
res = self.update_allowed_addresses() => {
if let Err(err) = res {
warn!("failed to update the allowed addresses: {err}");
// don't retry immediately in case it was a network failure, wait a bit instead.
task_client.wait(Duration::from_secs(5 * 60)).await
}
}
}
}
task_client.recv_timeout().await;
log::debug!("ValidAddressesProvider: Exiting");
}
}
#[derive(Clone)]
pub struct AllowedPaths {
// this is fine that this value is not wrapped in an Arc and is not atomic given
// it's not expected to be modified at runtime
allow_all: bool,
inner: Arc<RwLock<AllowedPathsInner>>,
}
impl AllowedPaths {
fn new(allow_all: bool) -> Self {
AllowedPaths {
allow_all,
inner: Arc::new(RwLock::new(AllowedPathsInner {
previous_epoch: HashSet::new(),
current_epoch: HashSet::new(),
})),
}
}
pub fn is_allowed(&self, address: IpAddr) -> bool {
if self.allow_all {
return true;
}
let guard = self.inner.read();
guard.current_epoch.contains(&address) || guard.previous_epoch.contains(&address)
}
fn advance_epoch(&self, current_epoch: HashSet<IpAddr>, reset_previous: bool) {
// if this is triggered, it's an implementation bug;
// we shouldn't be updating data if we're allowing everything regardless
debug_assert!(!self.allow_all);
let mut guard = self.inner.write();
let old_current = mem::replace(&mut guard.current_epoch, current_epoch);
if reset_previous {
guard.previous_epoch = HashSet::new()
} else {
guard.previous_epoch = old_current;
}
}
}
struct AllowedPathsInner {
previous_epoch: HashSet<IpAddr>,
current_epoch: HashSet<IpAddr>,
}
+3 -1
View File
@@ -1,5 +1,7 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// Copyright 2021-2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod forward_travel;
pub mod packet_processor;
pub mod verloc;
+27 -58
View File
@@ -1,81 +1,50 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// Copyright 2021-2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::fmt::{self, Display, Formatter};
use std::io;
use thiserror::Error;
#[derive(Debug)]
#[derive(Debug, Error)]
pub enum RttError {
#[error("the received echo packet had unexpected size")]
UnexpectedEchoPacketSize,
#[error("the received reply packet had unexpected size")]
UnexpectedReplyPacketSize,
#[error("the received echo packet had malformed sender")]
MalformedSenderIdentity,
#[error("the received echo packet had malformed signature")]
MalformedEchoSignature,
#[error("the received reply packet had malformed signature")]
MalformedReplySignature,
#[error("the received echo packet had invalid signature")]
InvalidEchoSignature,
#[error("the received reply packet had invalid signature")]
InvalidReplySignature,
UnreachableNode(String, io::Error),
UnexpectedConnectionFailureWrite(String, io::Error),
UnexpectedConnectionFailureRead(String, io::Error),
#[error("could not establish connection to {0}: {1}")]
UnreachableNode(String, #[source] io::Error),
#[error("failed to write echo packet to {0}: {1}")]
UnexpectedConnectionFailureWrite(String, #[source] io::Error),
#[error("failed to read reply packet from {0}: {1}")]
UnexpectedConnectionFailureRead(String, #[source] io::Error),
#[error("timed out while trying to read reply packet from {0}")]
ConnectionReadTimeout(String),
#[error("timed out while trying to write echo packet to {0}")]
ConnectionWriteTimeout(String),
#[error("the received reply packet had an unexpected sequence number")]
UnexpectedReplySequence,
#[error("shutdown signal received")]
ShutdownReceived,
}
impl Display for RttError {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
RttError::UnexpectedEchoPacketSize => {
write!(f, "The received echo packet had unexpected size")
}
RttError::UnexpectedReplyPacketSize => {
write!(f, "The received reply packet had unexpected size")
}
RttError::MalformedSenderIdentity => {
write!(f, "The received echo packet had malformed sender")
}
RttError::MalformedEchoSignature => {
write!(f, "The received echo packet had malformed signature")
}
RttError::MalformedReplySignature => {
write!(f, "The received reply packet had malformed signature")
}
RttError::InvalidEchoSignature => {
write!(f, "The received echo packet had invalid signature")
}
RttError::InvalidReplySignature => {
write!(f, "The received reply packet had invalid signature")
}
RttError::UnreachableNode(id, err) => {
write!(f, "Could not establish connection to {id} - {err}")
}
RttError::UnexpectedConnectionFailureWrite(id, err) => {
write!(f, "Failed to write echo packet to {id} - {err}")
}
RttError::UnexpectedConnectionFailureRead(id, err) => {
write!(f, "Failed to read reply packet from {id} - {err}")
}
RttError::ConnectionReadTimeout(id) => {
write!(f, "Timed out while trying to read reply packet from {id}")
}
RttError::ConnectionWriteTimeout(id) => {
write!(f, "Timed out while trying to write echo packet to {id}")
}
RttError::UnexpectedReplySequence => write!(
f,
"The received reply packet had an unexpected sequence number"
),
RttError::ShutdownReceived => {
write!(f, "Shutdown signal received")
}
}
}
}
impl std::error::Error for RttError {}
+2 -2
View File
@@ -7,13 +7,13 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio-stream = "0.1.11" # this one seems to be a thing until `Stream` trait is stabilised in stdlib
tokio-stream = { workspace = true } # this one seems to be a thing until `Stream` trait is stabilised in stdlib
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio]
workspace = true
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-util]
version = "0.7.4"
workspace = true
features = ["time"]
[target."cfg(target_arch = \"wasm32\")".dependencies.wasmtimer]
+1 -1
View File
@@ -9,7 +9,7 @@ repository = { workspace = true }
[dependencies]
bytes = "1.0"
tokio-util = { version = "0.7.4", features = ["codec"] }
tokio-util = { workspace = true, features = ["codec"] }
thiserror = { workspace = true }
nym-sphinx-types = { path = "../types", features = ["sphinx", "outfox"] }
+1 -1
View File
@@ -9,7 +9,7 @@ edition = "2021"
[dependencies]
bytes = "1.0"
tokio = { version = "1.24.1", features = [ "net", "io-util", "sync", "macros", "time", "rt-multi-thread" ] }
tokio-util = { version = "0.7.4", features = [ "io" ] } # reason for getting this guy is to to able to port to tokio 1.X more quickly by being able to use
tokio-util = { workspace = true, features = [ "io" ] } # reason for getting this guy is to to able to port to tokio 1.X more quickly by being able to use
# their `read_buf` [from the util crate] replacement rather than having to rethink/reimplement `AvailableReader` with the new AsyncRead trait definition.
# In the long run, the dependency should probably get removed in favour of pure-tokio implementation, but for time being it's fine.
futures = { workspace = true }
+8
View File
@@ -373,6 +373,14 @@ impl TaskClient {
}
}
pub async fn wait(&mut self, duration: Duration) {
tokio::select! {
biased;
_ = self.recv() => (),
_ = sleep(duration) => (),
}
}
// Create a dummy that will never report that we should shutdown.
pub fn dummy() -> TaskClient {
let (_notify_tx, notify_rx) = watch::channel(());
+2 -2
View File
@@ -45,9 +45,9 @@ serde = { version = "1.0", features = ["derive"] }
serde_derive = "1.0.149"
serde_json = "1.0.91"
thiserror = { workspace = true }
tokio = { version = "1", features = ["macros", "net","rt-multi-thread"] }
tokio = { workspace = true, features = ["macros", "net","rt-multi-thread"] }
tokio-tungstenite = { workspace = true }
tokio-util = { version = "0.7.4", features = ["full"] }
tokio-util = { workspace = true, features = ["full"] }
toml = "0.7.0"
unsigned-varint = "0.7.1"
utoipa = { workspace = true, features = ["actix_extras"] }
+4 -5
View File
@@ -31,8 +31,7 @@ humantime-serde = "1.0.1"
ipnetwork = "0.16"
lazy_static = "1.4.0"
log = { workspace = true }
once_cell = "1.7.2"
pretty_env_logger = "0.4"
once_cell = { workspace = true }
rand = "0.7"
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
@@ -51,9 +50,9 @@ tokio = { workspace = true, features = [
"fs",
"time",
] }
tokio-stream = { version = "0.1.11", features = ["fs"] }
tokio-tungstenite = { version = "0.20.1" }
tokio-util = { version = "0.7.4", features = ["codec"] }
tokio-stream = { workspace = true, features = ["fs"] }
tokio-tungstenite = { workspace = true }
tokio-util = { workspace = true, features = ["codec"] }
url = { workspace = true, features = ["serde"] }
zeroize = { workspace = true }
+5
View File
@@ -38,6 +38,7 @@ pub(crate) struct OverrideConfig {
pub(crate) statistics_service_url: Option<url::Url>,
pub(crate) nym_apis: Option<Vec<url::Url>>,
pub(crate) mnemonic: Option<bip39::Mnemonic>,
pub(crate) enforce_forward_travel: Option<bool>,
pub(crate) nyxd_urls: Option<Vec<url::Url>>,
pub(crate) only_coconut_credentials: Option<bool>,
pub(crate) with_network_requester: Option<bool>,
@@ -52,6 +53,10 @@ impl OverrideConfig {
.with_optional(Config::with_listening_address, self.listening_address)
.with_optional(Config::with_mix_port, self.mix_port)
.with_optional(Config::with_clients_port, self.clients_port)
.with_optional(
Config::with_enforce_forward_travel,
self.enforce_forward_travel,
)
.with_optional_custom_env(
Config::with_custom_nym_apis,
self.nym_apis,
+30 -23
View File
@@ -20,41 +20,41 @@ use super::helpers::OverrideIpPacketRouterConfig;
#[derive(Args, Clone)]
pub struct Init {
/// Id of the gateway we want to create config for
#[clap(long)]
#[arg(long)]
id: String,
/// The listening address on which the gateway will be receiving sphinx packets and listening for client data
#[clap(long, alias = "host")]
#[arg(long, alias = "host")]
listening_address: IpAddr,
/// Comma separated list of public ip addresses that will announced to the nym-api and subsequently to the clients.
/// In nearly all circumstances, it's going to be identical to the address you're going to use for bonding.
#[clap(long, value_delimiter = ',')]
#[arg(long, value_delimiter = ',')]
public_ips: Option<Vec<IpAddr>>,
/// Optional hostname associated with this gateway that will announced to the nym-api and subsequently to the clients
#[clap(long)]
#[arg(long)]
hostname: Option<String>,
/// The port on which the gateway will be listening for sphinx packets
#[clap(long)]
#[arg(long)]
mix_port: Option<u16>,
/// The port on which the gateway will be listening for clients gateway-requests
#[clap(long)]
#[arg(long)]
clients_port: Option<u16>,
/// Path to sqlite database containing all gateway persistent data
#[clap(long)]
#[arg(long)]
datastore: Option<PathBuf>,
/// Comma separated list of endpoints of nym APIs
#[clap(long, alias = "validator_apis", value_delimiter = ',')]
#[arg(long, alias = "validator_apis", value_delimiter = ',')]
// the alias here is included for backwards compatibility (1.1.4 and before)
nym_apis: Option<Vec<url::Url>>,
/// Comma separated list of endpoints of the validator
#[clap(
#[arg(
long,
alias = "validators",
alias = "nyxd_validators",
@@ -65,47 +65,52 @@ pub struct Init {
nyxd_urls: Option<Vec<url::Url>>,
/// Cosmos wallet mnemonic needed for double spending protection
#[clap(long)]
#[arg(long)]
mnemonic: Option<bip39::Mnemonic>,
/// Set this gateway to work only with coconut credentials; that would disallow clients to
/// bypass bandwidth credential requirement
#[clap(long, hide = true)]
#[arg(long, hide = true)]
only_coconut_credentials: Option<bool>,
/// Enable/disable gateway anonymized statistics that get sent to a statistics aggregator server
#[clap(long)]
#[arg(long)]
enabled_statistics: Option<bool>,
/// URL where a statistics aggregator is running. The default value is a Nym aggregator server
#[clap(long)]
#[arg(long)]
statistics_service_url: Option<url::Url>,
/// Specifies whether this node should accepts and send out packets that would only go to nodes
/// on the next mix layer
#[arg(long)]
enforce_forward_travel: bool,
/// Allows this gateway to run an embedded network requester for minimal network overhead
#[clap(long, conflicts_with = "with_ip_packet_router")]
#[arg(long, conflicts_with = "with_ip_packet_router")]
with_network_requester: bool,
/// Allows this gateway to run an embedded network requester for minimal network overhead
#[clap(long, hide = true, conflicts_with = "with_network_requester")]
#[arg(long, hide = true, conflicts_with = "with_network_requester")]
with_ip_packet_router: bool,
// ##### NETWORK REQUESTER FLAGS #####
/// Specifies whether this network requester should run in 'open-proxy' mode
#[clap(long, requires = "with_network_requester")]
#[arg(long, requires = "with_network_requester")]
open_proxy: Option<bool>,
/// Enable service anonymized statistics that get sent to a statistics aggregator server
#[clap(long, requires = "with_network_requester")]
#[arg(long, requires = "with_network_requester")]
enable_statistics: Option<bool>,
/// Mixnet client address where a statistics aggregator is running. The default value is a Nym
/// aggregator client
#[clap(long, requires = "with_network_requester")]
#[arg(long, requires = "with_network_requester")]
statistics_recipient: Option<String>,
/// Mostly debug-related option to increase default traffic rate so that you would not need to
/// modify config post init
#[clap(
#[arg(
long,
hide = true,
conflicts_with = "medium_toggle",
@@ -114,7 +119,7 @@ pub struct Init {
fastmode: bool,
/// Disable loop cover traffic and the Poisson rate limiter (for debugging only)
#[clap(
#[arg(
long,
hide = true,
conflicts_with = "medium_toggle",
@@ -124,7 +129,7 @@ pub struct Init {
/// Enable medium mixnet traffic, for experiments only.
/// This includes things like disabling cover traffic, no per hop delays, etc.
#[clap(
#[arg(
long,
hide = true,
conflicts_with = "no_cover",
@@ -136,10 +141,10 @@ pub struct Init {
/// Specifies whether this network requester will run using the default ExitPolicy
/// as opposed to the allow list.
/// Note: this setting will become the default in the future releases.
#[clap(long)]
#[arg(long)]
with_exit_policy: Option<bool>,
#[clap(short, long, default_value_t = OutputFormat::default())]
#[arg(short, long, default_value_t = OutputFormat::default())]
output: OutputFormat,
}
@@ -154,6 +159,7 @@ impl From<Init> for OverrideConfig {
datastore: init_config.datastore,
nym_apis: init_config.nym_apis,
mnemonic: init_config.mnemonic,
enforce_forward_travel: Some(init_config.enforce_forward_travel),
enabled_statistics: init_config.enabled_statistics,
statistics_service_url: init_config.statistics_service_url,
@@ -302,6 +308,7 @@ mod tests {
no_cover: false,
medium_toggle: false,
with_exit_policy: None,
enforce_forward_travel: false,
};
std::env::set_var(BECH32_PREFIX, "n");
+6
View File
@@ -108,6 +108,11 @@ pub struct Run {
#[arg(long)]
statistics_recipient: Option<String>,
/// Specifies whether this node should accepts and send out packets that would only go to nodes
/// on the next mix layer
#[arg(long)]
enforce_forward_travel: Option<bool>,
/// Mostly debug-related option to increase default traffic rate so that you would not need to
/// modify config post init
#[arg(long, hide = true, conflicts_with = "medium_toggle")]
@@ -157,6 +162,7 @@ impl From<Run> for OverrideConfig {
datastore: run_config.datastore,
nym_apis: run_config.nym_apis,
mnemonic: run_config.mnemonic,
enforce_forward_travel: run_config.enforce_forward_travel,
enabled_statistics: run_config.enabled_statistics,
statistics_service_url: run_config.statistics_service_url,
+27 -1
View File
@@ -188,11 +188,13 @@ impl Config {
self
}
#[must_use]
pub fn with_enabled_network_requester(mut self, enabled_network_requester: bool) -> Self {
self.network_requester.enabled = enabled_network_requester;
self
}
#[must_use]
pub fn with_default_network_requester_config_path(mut self) -> Self {
self.storage_paths = self
.storage_paths
@@ -212,36 +214,43 @@ impl Config {
self
}
#[must_use]
pub fn with_only_coconut_credentials(mut self, only_coconut_credentials: bool) -> Self {
self.gateway.only_coconut_credentials = only_coconut_credentials;
self
}
#[must_use]
pub fn with_enabled_statistics(mut self, enabled_statistics: bool) -> Self {
self.gateway.enabled_statistics = enabled_statistics;
self
}
#[must_use]
pub fn with_custom_statistics_service_url(mut self, statistics_service_url: Url) -> Self {
self.gateway.statistics_service_url = statistics_service_url;
self
}
#[must_use]
pub fn with_custom_nym_apis(mut self, nym_api_urls: Vec<Url>) -> Self {
self.gateway.nym_api_urls = nym_api_urls;
self
}
#[must_use]
pub fn with_custom_validator_nyxd(mut self, validator_nyxd_urls: Vec<Url>) -> Self {
self.gateway.nyxd_urls = validator_nyxd_urls;
self
}
#[must_use]
pub fn with_cosmos_mnemonic(mut self, cosmos_mnemonic: bip39::Mnemonic) -> Self {
self.gateway.cosmos_mnemonic = cosmos_mnemonic;
self
}
#[must_use]
pub fn with_listening_address(mut self, listening_address: IpAddr) -> Self {
self.gateway.listening_address = listening_address;
@@ -253,16 +262,25 @@ impl Config {
self
}
#[must_use]
pub fn with_mix_port(mut self, port: u16) -> Self {
self.gateway.mix_port = port;
self
}
#[must_use]
pub fn with_clients_port(mut self, port: u16) -> Self {
self.gateway.clients_port = port;
self
}
#[must_use]
pub fn with_enforce_forward_travel(mut self, forward_travel: bool) -> Self {
self.debug.enforce_forward_travel = forward_travel;
self
}
#[must_use]
pub fn with_custom_persistent_store(mut self, store_dir: PathBuf) -> Self {
self.storage_paths.clients_storage = store_dir;
self
@@ -328,7 +346,8 @@ pub struct Gateway {
#[zeroize(skip)]
pub nym_api_urls: Vec<Url>,
/// Addresses to validators which the node uses to check for double spending of ERC20 tokens.
/// Addresses to nyxd validators via which the node can communicate with the chain directly,
/// including for checking for double spending of coconut credentials.
#[serde(alias = "validator_nymd_urls")]
#[zeroize(skip)]
pub nyxd_urls: Vec<Url>,
@@ -421,6 +440,10 @@ pub struct Debug {
/// Number of messages from offline client that can be pulled at once from the storage.
pub message_retrieval_limit: i64,
/// Specifies whether this node should accepts and send out packets that would only go to nodes
/// on the next mix layer.
pub enforce_forward_travel: bool,
/// Specifies whether the mixnode should be using the legacy framing for the sphinx packets.
// it's set to true by default. The reason for that decision is to preserve compatibility with the
// existing nodes whilst everyone else is upgrading and getting the code for handling the new field.
@@ -438,6 +461,9 @@ impl Default for Debug {
maximum_connection_buffer_size: DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE,
stored_messages_filename_length: DEFAULT_STORED_MESSAGE_FILENAME_LENGTH,
message_retrieval_limit: DEFAULT_MESSAGE_RETRIEVAL_LIMIT,
// let's keep it disabled for now to not surprise operators/users
enforce_forward_travel: false,
use_legacy_framed_packet_version: false,
}
}
+1
View File
@@ -163,6 +163,7 @@ impl From<ConfigV1_1_31> for Config {
presence_sending_delay: value.debug.presence_sending_delay,
stored_messages_filename_length: value.debug.stored_messages_filename_length,
message_retrieval_limit: value.debug.message_retrieval_limit,
enforce_forward_travel: false,
use_legacy_framed_packet_version: value.debug.use_legacy_framed_packet_version,
},
}
+6
View File
@@ -116,4 +116,10 @@ ip_packet_router_config = '{{ storage_paths.ip_packet_router_config }}'
# TODO
[debug]
# Specifies whether this node should accepts and send out packets that would only go to nodes
# on the next mix layer.
enforce_forward_travel = {{ debug.enforce_forward_travel }}
"#;
+7
View File
@@ -3,6 +3,7 @@
use crate::node::storage::error::StorageError;
use nym_ip_packet_router::error::IpPacketRouterError;
use nym_mixnode_common::forward_travel::error::ForwardTravelError;
use nym_network_requester::error::{ClientCoreError, NetworkRequesterError};
use nym_validator_client::nyxd::error::NyxdError;
use nym_validator_client::nyxd::AccountId;
@@ -131,6 +132,12 @@ pub(crate) enum GatewayError {
source: NyxdError,
},
#[error("failure in enforcing forward travel of mix packets: {source}")]
ForwardTravel {
#[from]
source: ForwardTravelError,
},
// TODO: in the future this should work the other way, i.e. NymNode depending on Gateway errors
#[error(transparent)]
NymNodeError(#[from] nym_node::error::NymNodeError),
@@ -15,12 +15,12 @@ use nym_sphinx::forwarding::packet::MixPacket;
use nym_task::TaskClient;
use nym_validator_client::coconut::CoconutApiError;
use rand::{CryptoRng, Rng};
use std::net::SocketAddr;
use std::{convert::TryFrom, process, time::Duration};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_tungstenite::tungstenite::{protocol::Message, Error as WsError};
use std::{convert::TryFrom, process, time::Duration};
use crate::node::{
client_handling::{
bandwidth::Bandwidth,
@@ -40,13 +40,13 @@ pub(crate) enum RequestHandlingError {
#[error("Internal gateway storage error")]
StorageError(#[from] StorageError),
#[error("Provided bandwidth IV is malformed - {0}")]
#[error("Provided bandwidth IV is malformed: {0}")]
MalformedIV(#[from] IVConversionError),
#[error("Provided binary request was malformed - {0}")]
#[error("Provided binary request was malformed: {0}")]
InvalidBinaryRequest(#[from] GatewayRequestsError),
#[error("Provided binary request was malformed - {0}")]
#[error("Provided binary request was malformed: {0}")]
InvalidTextRequest(<ClientControlRequest as TryFrom<String>>::Error),
#[error("The received request is not valid in the current context")]
@@ -61,10 +61,10 @@ pub(crate) enum RequestHandlingError {
#[error("This gateway is only accepting coconut credentials for bandwidth")]
OnlyCoconutCredentials,
#[error("Nyxd Error - {0}")]
#[error("Nyxd Error: {0}")]
NyxdError(#[from] nym_validator_client::nyxd::error::NyxdError),
#[error("Validator API error - {0}")]
#[error("Validator API error: {0}")]
APIError(#[from] nym_validator_client::ValidatorClientError),
#[error("Not enough nym API endpoints provided. Needed {needed}, received {received}")]
@@ -73,14 +73,17 @@ pub(crate) enum RequestHandlingError {
#[error("There was a problem with the proposal id: {reason}")]
ProposalIdError { reason: String },
#[error("Coconut interface error - {0}")]
#[error("Coconut interface error: {0}")]
CoconutInterfaceError(#[from] nym_coconut_interface::error::CoconutInterfaceError),
#[error("coconut api query failure: {0}")]
CoconutApiError(#[from] CoconutApiError),
#[error("Credential error - {0}")]
#[error("Credential error: {0}")]
CredentialError(#[from] nym_credentials::error::Error),
#[error("the outbound address of the received packet ('{address}') is forbidden as there are no nodes with that address on the next layer")]
InvalidForwardHop { address: SocketAddr },
}
impl RequestHandlingError {
@@ -203,11 +206,25 @@ where
/// # Arguments
///
/// * `mix_packet`: packet received from the client that should get forwarded into the network.
fn forward_packet(&self, mix_packet: MixPacket) {
fn forward_packet(&self, mix_packet: MixPacket) -> Result<(), RequestHandlingError> {
let next_hop: SocketAddr = mix_packet.next_hop().into();
// TODO: another option is to move this filter
// (which is used by EVERY `ConnectionHandler`, so potentially hundreds of times)
// to packet forwarder where we could be filtering at the time of attempting to open new outbound connections
// However, in that case we wouldn't be able to return an error message to the client
if !self.inner.allowed_egress.is_allowed(next_hop.ip()) {
// TODO: perhaps this should get lowered in severity?
warn!("received an packet that was meant to get forwarded to {next_hop}, but this address does not belong to any node on the next layer - dropping the packet");
return Err(RequestHandlingError::InvalidForwardHop { address: next_hop });
}
if let Err(err) = self.inner.outbound_mix_sender.unbounded_send(mix_packet) {
error!("We failed to forward requested mix packet - {err}. Presumably our mix forwarder has crashed. We cannot continue.");
process::exit(1);
}
Ok(())
}
/// Tries to handle the received bandwidth request by checking correctness of the received data
@@ -317,7 +334,7 @@ where
}
self.consume_bandwidth(consumed_bandwidth).await?;
self.forward_packet(mix_packet);
self.forward_packet(mix_packet)?;
Ok(ServerResponse::Send {
remaining_bandwidth: available_bandwidth - consumed_bandwidth,
@@ -18,6 +18,7 @@ use nym_gateway_requests::{
BinaryResponse, PROTOCOL_VERSION,
};
use nym_mixnet_client::forwarder::MixForwardingSender;
use nym_mixnode_common::forward_travel::AllowedEgress;
use nym_sphinx::DestinationAddressBytes;
use rand::{CryptoRng, Rng};
use std::{convert::TryFrom, sync::Arc, time::Duration};
@@ -89,6 +90,7 @@ impl InitialAuthenticationError {
pub(crate) struct FreshHandler<R, S, St> {
rng: R,
local_identity: Arc<identity::KeyPair>,
pub(crate) allowed_egress: AllowedEgress,
pub(crate) only_coconut_credentials: bool,
pub(crate) active_clients_store: ActiveClientsStore,
pub(crate) outbound_mix_sender: MixForwardingSender,
@@ -110,6 +112,7 @@ where
pub(crate) fn new(
rng: R,
conn: S,
allowed_egress: AllowedEgress,
only_coconut_credentials: bool,
outbound_mix_sender: MixForwardingSender,
local_identity: Arc<identity::KeyPair>,
@@ -126,6 +129,7 @@ where
local_identity,
storage,
coconut_verifier,
allowed_egress,
}
}
@@ -8,6 +8,7 @@ use crate::node::storage::Storage;
use log::*;
use nym_crypto::asymmetric::identity;
use nym_mixnet_client::forwarder::MixForwardingSender;
use nym_mixnode_common::forward_travel::AllowedEgress;
use rand::rngs::OsRng;
use std::net::SocketAddr;
use std::process;
@@ -16,6 +17,7 @@ use tokio::task::JoinHandle;
pub(crate) struct Listener {
address: SocketAddr,
allowed_egress: AllowedEgress,
local_identity: Arc<identity::KeyPair>,
only_coconut_credentials: bool,
pub(crate) coconut_verifier: Arc<CoconutVerifier>,
@@ -24,12 +26,14 @@ pub(crate) struct Listener {
impl Listener {
pub(crate) fn new(
address: SocketAddr,
allowed_egress: AllowedEgress,
local_identity: Arc<identity::KeyPair>,
only_coconut_credentials: bool,
coconut_verifier: Arc<CoconutVerifier>,
) -> Self {
Listener {
address,
allowed_egress,
local_identity,
only_coconut_credentials,
coconut_verifier,
@@ -71,6 +75,7 @@ impl Listener {
let handle = FreshHandler::new(
OsRng,
socket,
self.allowed_egress.clone(),
self.only_coconut_credentials,
outbound_mix_sender.clone(),
Arc::clone(&self.local_identity),
@@ -4,6 +4,7 @@
use crate::node::mixnet_handling::receiver::connection_handler::ConnectionHandler;
use crate::node::storage::Storage;
use log::*;
use nym_mixnode_common::forward_travel::AllowedIngress;
use nym_task::TaskClient;
use std::net::SocketAddr;
use std::process;
@@ -11,13 +12,22 @@ use tokio::task::JoinHandle;
pub(crate) struct Listener {
address: SocketAddr,
allowed_ingress: AllowedIngress,
shutdown: TaskClient,
}
// TODO: this file is nearly identical to the one in mixnode
impl Listener {
pub(crate) fn new(address: SocketAddr, shutdown: TaskClient) -> Self {
Listener { address, shutdown }
pub(crate) fn new(
address: SocketAddr,
allowed_ingress: AllowedIngress,
shutdown: TaskClient,
) -> Self {
Listener {
address,
allowed_ingress,
shutdown,
}
}
pub(crate) async fn run<St>(&mut self, connection_handler: ConnectionHandler<St>)
@@ -42,6 +52,12 @@ impl Listener {
connection = tcp_listener.accept() => {
match connection {
Ok((socket, remote_addr)) => {
if !self.allowed_ingress.is_allowed(remote_addr.ip()) {
// TODO: perhaps this should get lowered in severity?
warn!("received an incoming connection from {remote_addr}, but this address does not belong to any node on the previous layer - dropping the connection");
continue
}
let handler = connection_handler.clone();
tokio::spawn(handler.handle_connection(socket, remote_addr, self.shutdown.clone().named(format!("MixnetConnectionHandler_{remote_addr}"))));
}
+36 -1
View File
@@ -28,6 +28,7 @@ use futures::channel::{mpsc, oneshot};
use log::*;
use nym_crypto::asymmetric::{encryption, identity};
use nym_mixnet_client::forwarder::{MixForwardingSender, PacketForwarder};
use nym_mixnode_common::forward_travel::{AllowedAddressesProvider, AllowedEgress, AllowedIngress};
use nym_network_defaults::NymNetworkDetails;
use nym_network_requester::{LocalGateway, NRServiceProviderBuilder, RequestFilter};
use nym_node::wireguard::types::GatewayClientRegistry;
@@ -178,6 +179,7 @@ impl<St> Gateway<St> {
&self,
ack_sender: MixForwardingSender,
active_clients_store: ActiveClientsStore,
ingress: AllowedIngress,
shutdown: TaskClient,
) where
St: Storage + Clone + 'static,
@@ -199,7 +201,8 @@ impl<St> Gateway<St> {
self.config.gateway.mix_port,
);
mixnet_handling::Listener::new(listening_address, shutdown).start(connection_handler);
mixnet_handling::Listener::new(listening_address, ingress, shutdown)
.start(connection_handler);
}
#[cfg(feature = "wireguard")]
@@ -212,6 +215,7 @@ impl<St> Gateway<St> {
fn start_client_websocket_listener(
&self,
allowed_egress: AllowedEgress,
forwarding_channel: MixForwardingSender,
active_clients_store: ActiveClientsStore,
shutdown: TaskClient,
@@ -228,6 +232,7 @@ impl<St> Gateway<St> {
websocket::Listener::new(
listening_address,
allowed_egress,
Arc::clone(&self.identity_keypair),
self.config.gateway.only_coconut_credentials,
coconut_verifier,
@@ -240,6 +245,28 @@ impl<St> Gateway<St> {
);
}
async fn start_allowed_addresses_provider(
&self,
task_client: TaskClient,
) -> Result<(AllowedIngress, AllowedEgress), GatewayError> {
let identity = self.identity_keypair.public_key().to_base58_string();
let nyxd_endpoints = self.config.gateway.nyxd_urls.clone();
let network = NymNetworkDetails::new_from_env();
let mut provider = AllowedAddressesProvider::new(
identity,
nyxd_endpoints,
!self.config.debug.enforce_forward_travel,
Some(network),
)
.await?;
let filters = (provider.ingress(), provider.egress());
tokio::spawn(async move { provider.run(task_client).await });
Ok(filters)
}
fn start_packet_forwarder(&self, shutdown: TaskClient) -> MixForwardingSender {
info!("Starting mix packet forwarder...");
@@ -449,6 +476,12 @@ impl<St> Gateway<St> {
let shutdown = TaskManager::new(10);
let (ingress, egress) = self
.start_allowed_addresses_provider(
shutdown.subscribe().named("AllowedAddressesProvider"),
)
.await?;
let coconut_verifier = {
let nyxd_client = self.random_nyxd_client()?;
CoconutVerifier::new(nyxd_client)
@@ -461,6 +494,7 @@ impl<St> Gateway<St> {
self.start_mix_socket_listener(
mix_forwarding_channel.clone(),
active_clients_store.clone(),
ingress,
shutdown.subscribe().named("mixnet_handling::Listener"),
);
@@ -478,6 +512,7 @@ impl<St> Gateway<St> {
}
self.start_client_websocket_listener(
egress,
mix_forwarding_channel.clone(),
active_clients_store.clone(),
shutdown.subscribe().named("websocket::Listener"),
+4 -5
View File
@@ -18,7 +18,7 @@ rust-version = "1.58.1"
[dependencies]
axum = { workspace = true }
anyhow = "1.0.40"
anyhow = { workspace = true }
bs58 = "0.4.0"
clap = { workspace = true, features = ["cargo", "derive"] }
colored = "2.0"
@@ -26,15 +26,14 @@ cupid = "0.6.1"
dirs = "4.0"
futures = { workspace = true }
humantime-serde = "1.0"
lazy_static = "1.4.0"
lazy_static = { workspace = true }
log = { workspace = true }
pretty_env_logger = "0.4.0"
rand = "0.7.3"
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sysinfo = "0.27.7"
tokio = { version = "1.21.2", features = ["rt-multi-thread", "net", "signal"] }
tokio-util = { version = "0.7.3", features = ["codec"] }
tokio = { workspace = true, features = ["rt-multi-thread", "net", "signal"] }
tokio-util = { workspace = true, features = ["codec"] }
toml = "0.5.8"
url = { workspace = true, features = ["serde"] }
cfg-if = "1.0.0"
+18 -7
View File
@@ -16,31 +16,40 @@ use std::{fs, io};
#[derive(Args, Clone)]
pub(crate) struct Init {
/// Id of the mixnode we want to create config for
#[clap(long)]
#[arg(long)]
id: String,
/// The host on which the mixnode will be running
#[clap(long)]
#[arg(long)]
host: IpAddr,
/// The port on which the mixnode will be listening for mix packets
#[clap(long)]
#[arg(long)]
mix_port: Option<u16>,
/// The port on which the mixnode will be listening for verloc packets
#[clap(long)]
#[arg(long)]
verloc_port: Option<u16>,
/// The port on which the mixnode will be listening for http requests
#[clap(long)]
#[arg(long)]
http_api_port: Option<u16>,
/// Comma separated list of nym-api endpoints of the validators
// the alias here is included for backwards compatibility (1.1.4 and before)
#[clap(long, alias = "validators", value_delimiter = ',')]
#[arg(long, alias = "validators", value_delimiter = ',')]
nym_apis: Option<Vec<url::Url>>,
#[clap(short, long, default_value_t = OutputFormat::default())]
/// Comma separated list of endpoints of the nyxd validators
#[arg(long, value_delimiter = ',')]
nyxd_urls: Option<Vec<url::Url>>,
/// Specifies whether this node should accepts and send out packets that would only go to nodes
/// on the next mix layer
#[arg(long)]
enforce_forward_travel: bool,
#[arg(short, long, default_value_t = OutputFormat::default())]
output: OutputFormat,
}
@@ -52,7 +61,9 @@ impl From<Init> for OverrideConfig {
mix_port: init_config.mix_port,
verloc_port: init_config.verloc_port,
http_api_port: init_config.http_api_port,
enforce_forward_travel: Some(init_config.enforce_forward_travel),
nym_apis: init_config.nym_apis,
nyxd_urls: init_config.nyxd_urls,
}
}
}
+17 -5
View File
@@ -10,7 +10,7 @@ use colored::Colorize;
use log::{error, info, warn};
use nym_bin_common::completions::{fig_generate, ArgShell};
use nym_bin_common::version_checker;
use nym_config::defaults::var_names::{BECH32_PREFIX, NYM_API};
use nym_config::defaults::var_names;
use nym_config::OptionalSet;
use nym_crypto::bech32_address_validation;
use std::net::IpAddr;
@@ -58,7 +58,9 @@ struct OverrideConfig {
mix_port: Option<u16>,
verloc_port: Option<u16>,
http_api_port: Option<u16>,
enforce_forward_travel: Option<bool>,
nym_apis: Option<Vec<url::Url>>,
nyxd_urls: Option<Vec<url::Url>>,
}
pub(crate) async fn execute(args: Cli) -> anyhow::Result<()> {
@@ -83,22 +85,32 @@ fn override_config(config: Config, args: OverrideConfig) -> Config {
.with_optional(Config::with_mix_port, args.mix_port)
.with_optional(Config::with_verloc_port, args.verloc_port)
.with_optional(Config::with_http_api_port, args.http_api_port)
.with_optional(
Config::with_enforce_forward_travel,
args.enforce_forward_travel,
)
.with_optional_custom_env(
Config::with_custom_nym_apis,
args.nym_apis,
NYM_API,
var_names::NYM_API,
nym_config::parse_urls,
)
.with_optional_custom_env(
Config::with_custom_nyxd,
args.nyxd_urls,
var_names::NYXD,
nym_config::parse_urls,
)
}
/// Ensures that a given bech32 address is valid, or exits
pub(crate) fn validate_bech32_address_or_exit(address: &str) {
let prefix = std::env::var(BECH32_PREFIX).expect("bech32 prefix not set");
let prefix = std::env::var(var_names::BECH32_PREFIX).expect("bech32 prefix not set");
if let Err(bech32_address_validation::Bech32Error::DecodeFailed(err)) =
bech32_address_validation::try_bech32_decode(address)
{
let error_message = format!("Error: wallet address decoding failed: {err}").red();
error!("{}", error_message);
error!("{error_message}");
error!("Exiting...");
process::exit(1);
}
@@ -107,7 +119,7 @@ pub(crate) fn validate_bech32_address_or_exit(address: &str) {
bech32_address_validation::validate_bech32_prefix(&prefix, address)
{
let error_message = format!("Error: wallet address type is wrong, {err}").red();
error!("{}", error_message);
error!("{error_message}");
error!("Exiting...");
process::exit(1);
}
+19 -8
View File
@@ -14,35 +14,44 @@ use std::net::IpAddr;
#[derive(Args, Clone)]
pub(crate) struct Run {
/// Id of the nym-mixnode we want to run
#[clap(long)]
#[arg(long)]
id: String,
/// The custom host on which the mixnode will be running
#[clap(long)]
#[arg(long)]
host: Option<IpAddr>,
/// The wallet address you will use to bond this mixnode, e.g. nymt1z9egw0knv47nmur0p8vk4rcx59h9gg4zuxrrr9
#[clap(long)]
#[arg(long)]
wallet_address: Option<nyxd::AccountId>,
/// The port on which the mixnode will be listening for mix packets
#[clap(long)]
#[arg(long)]
mix_port: Option<u16>,
/// The port on which the mixnode will be listening for verloc packets
#[clap(long)]
#[arg(long)]
verloc_port: Option<u16>,
/// The port on which the mixnode will be listening for http requests
#[clap(long)]
#[arg(long)]
http_api_port: Option<u16>,
/// Comma separated list of nym-api endpoints of the validators
// the alias here is included for backwards compatibility (1.1.4 and before)
#[clap(long, alias = "validators", value_delimiter = ',')]
#[arg(long, alias = "validators", value_delimiter = ',')]
nym_apis: Option<Vec<url::Url>>,
#[clap(short, long, default_value_t = OutputFormat::default())]
/// Comma separated list of endpoints of the nyxd validators
#[arg(long, value_delimiter = ',')]
nyxd_urls: Option<Vec<url::Url>>,
/// Specifies whether this node should accepts and send out packets that would only go to nodes
/// on the next mix layer
#[arg(long)]
enforce_forward_travel: Option<bool>,
#[arg(short, long, default_value_t = OutputFormat::default())]
output: OutputFormat,
}
@@ -54,7 +63,9 @@ impl From<Run> for OverrideConfig {
mix_port: run_config.mix_port,
verloc_port: run_config.verloc_port,
http_api_port: run_config.http_api_port,
enforce_forward_travel: run_config.enforce_forward_travel,
nym_apis: run_config.nym_apis,
nyxd_urls: run_config.nyxd_urls,
}
}
}
+28
View File
@@ -175,11 +175,19 @@ impl Config {
}
// builder methods
#[must_use]
pub fn with_custom_nym_apis(mut self, nym_api_urls: Vec<Url>) -> Self {
self.mixnode.nym_api_urls = nym_api_urls;
self
}
#[must_use]
pub fn with_custom_nyxd(mut self, nyxd_urls: Vec<Url>) -> Self {
self.mixnode.nyxd_urls = nyxd_urls;
self
}
#[must_use]
pub fn with_listening_address(mut self, listening_address: IpAddr) -> Self {
self.mixnode.listening_address = listening_address;
@@ -189,22 +197,31 @@ impl Config {
self
}
#[must_use]
pub fn with_mix_port(mut self, port: u16) -> Self {
self.mixnode.mix_port = port;
self
}
#[must_use]
pub fn with_verloc_port(mut self, port: u16) -> Self {
self.mixnode.verloc_port = port;
self
}
#[must_use]
pub fn with_http_api_port(mut self, port: u16) -> Self {
let http_ip = self.http.bind_address.ip();
self.http.bind_address = SocketAddr::new(http_ip, port);
self
}
#[must_use]
pub fn with_enforce_forward_travel(mut self, forward_travel: bool) -> Self {
self.debug.enforce_forward_travel = forward_travel;
self
}
pub fn get_nym_api_endpoints(&self) -> Vec<Url> {
self.mixnode.nym_api_urls.clone()
}
@@ -231,6 +248,9 @@ pub struct MixNode {
/// Addresses to nym APIs from which the node gets the view of the network.
pub nym_api_urls: Vec<Url>,
/// Addresses to nyxd validators via which the node can communicate with the chain directly.
pub nyxd_urls: Vec<Url>,
}
impl MixNode {
@@ -242,6 +262,7 @@ impl MixNode {
mix_port: DEFAULT_MIX_LISTENING_PORT,
verloc_port: DEFAULT_VERLOC_LISTENING_PORT,
nym_api_urls: vec![Url::from_str(mainnet::NYM_API).expect("Invalid default API URL")],
nyxd_urls: vec![Url::from_str(mainnet::NYXD_URL).expect("Invalid default nyxd URL")],
}
}
}
@@ -319,6 +340,10 @@ pub struct Debug {
/// Maximum number of packets that can be stored waiting to get sent to a particular connection.
pub maximum_connection_buffer_size: usize,
/// Specifies whether this node should accepts and send out packets that would only go to nodes
/// on the next mix layer.
pub enforce_forward_travel: bool,
/// Specifies whether the mixnode should be using the legacy framing for the sphinx packets.
// it's set to true by default. The reason for that decision is to preserve compatibility with the
// existing nodes whilst everyone else is upgrading and getting the code for handling the new field.
@@ -335,6 +360,9 @@ impl Default for Debug {
packet_forwarding_maximum_backoff: DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF,
initial_connection_timeout: DEFAULT_INITIAL_CONNECTION_TIMEOUT,
maximum_connection_buffer_size: DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE,
// let's keep it disabled for now to not surprise operators/users
enforce_forward_travel: false,
use_legacy_framed_packet_version: false,
}
}
+15
View File
@@ -75,6 +75,8 @@ impl ConfigV1_1_32 {
impl From<ConfigV1_1_32> for Config {
fn from(value: ConfigV1_1_32) -> Self {
let network = nym_config::defaults::NymNetworkDetails::new_from_env();
Config {
// \/ ADDED
save_path: None,
@@ -104,6 +106,18 @@ impl From<ConfigV1_1_32> for Config {
mix_port: value.mixnode.mix_port,
verloc_port: value.mixnode.verloc_port,
nym_api_urls: value.mixnode.nym_api_urls,
// \/ ADDED
nyxd_urls: network
.endpoints
.into_iter()
.map(|e| {
e.nyxd_url
.parse()
.expect("malformed nyxd url in environment")
})
.collect(),
// /\ ADDED
},
storage_paths: value.storage_paths,
verloc: value.verloc.into(),
@@ -243,6 +257,7 @@ impl From<DebugV1_1_32> for Debug {
packet_forwarding_maximum_backoff: value.packet_forwarding_maximum_backoff,
initial_connection_timeout: value.initial_connection_timeout,
maximum_connection_buffer_size: value.maximum_connection_buffer_size,
enforce_forward_travel: false,
use_legacy_framed_packet_version: value.use_legacy_framed_packet_version,
}
}
+13
View File
@@ -49,6 +49,13 @@ nym_api_urls = [
{{/each}}
]
# Addresses to nyxd validators via which the node can communicate with the chain directly.
nyxd_urls = [
{{#each mixnode.nyxd_urls }}
'{{this}}',
{{/each}}
]
[http]
# Socket address this node will use for binding its http API.
# default: `0.0.0.0:8000`
@@ -80,4 +87,10 @@ node_description = '{{ storage_paths.node_description }}'
# TODO
[debug]
# Specifies whether this node should accepts and send out packets that would only go to nodes
# on the next mix layer.
enforce_forward_travel = {{ debug.enforce_forward_travel }}
"#;
+18
View File
@@ -1,6 +1,7 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use nym_mixnode_common::forward_travel::error::ForwardTravelError;
use std::io;
use std::path::PathBuf;
use thiserror::Error;
@@ -44,7 +45,24 @@ pub enum MixnodeError {
source: io::Error,
},
#[error("experienced an error during shutdown: {message}")]
ShutdownFailure { message: String },
#[error("failure in enforcing forward travel of mix packets: {source}")]
ForwardTravel {
#[from]
source: ForwardTravelError,
},
// TODO: in the future this should work the other way, i.e. NymNode depending on Gateway errors
#[error(transparent)]
NymNodeError(#[from] nym_node::error::NymNodeError),
}
impl MixnodeError {
pub(crate) fn shutdown_failure(err: Box<dyn std::error::Error + Send + Sync>) -> Self {
MixnodeError::ShutdownFailure {
message: err.to_string(),
}
}
}
@@ -7,8 +7,8 @@ use crate::node::listener::connection_handler::packet_processing::{
use crate::node::packet_delayforwarder::PacketDelayForwardSender;
use crate::node::TaskClient;
use futures::StreamExt;
use log::debug;
use log::{error, info, warn};
use log::{debug, error, info, warn};
use nym_mixnode_common::forward_travel::AllowedEgress;
use nym_mixnode_common::measure;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::framing::codec::NymCodec;
@@ -26,22 +26,37 @@ pub(crate) mod packet_processing;
#[derive(Clone)]
pub(crate) struct ConnectionHandler {
allowed_egress: AllowedEgress,
packet_processor: PacketProcessor,
delay_forwarding_channel: PacketDelayForwardSender,
}
impl ConnectionHandler {
pub(crate) fn new(
allowed_egress: AllowedEgress,
packet_processor: PacketProcessor,
delay_forwarding_channel: PacketDelayForwardSender,
) -> Self {
ConnectionHandler {
allowed_egress,
packet_processor,
delay_forwarding_channel,
}
}
fn delay_and_forward_packet(&self, mix_packet: MixPacket, delay: Option<SphinxDelay>) {
let next_hop: SocketAddr = mix_packet.next_hop().into();
// TODO: another option is to move this filter
// (which is used by EVERY `ConnectionHandler`, so potentially hundreds of times)
// to the mixnet client where we could be filtering at the time of attempting to open new outbound connections
// However, in that case we'd have gone through the troubles of possibly unnecessarily delaying the packet
if !self.allowed_egress.is_allowed(next_hop.ip()) {
// TODO: perhaps this should get lowered in severity?
warn!("received an packet that was meant to get forwarded to {next_hop}, but this address does not belong to any node on the next layer - dropping the packet");
return;
}
// determine instant at which packet should get forwarded. this way we minimise effect of
// being stuck in the queue [of the channel] to get inserted into the delay queue
let forward_instant = delay.map(|delay| Instant::now() + delay.to_duration());
+18 -2
View File
@@ -3,6 +3,7 @@
use crate::node::listener::connection_handler::ConnectionHandler;
use log::{error, info, warn};
use nym_mixnode_common::forward_travel::AllowedIngress;
use std::net::SocketAddr;
use std::process;
use tokio::net::TcpListener;
@@ -14,12 +15,21 @@ pub(crate) mod connection_handler;
pub(crate) struct Listener {
address: SocketAddr,
allowed_ingress: AllowedIngress,
shutdown: TaskClient,
}
impl Listener {
pub(crate) fn new(address: SocketAddr, shutdown: TaskClient) -> Self {
Listener { address, shutdown }
pub(crate) fn new(
address: SocketAddr,
allowed_ingress: AllowedIngress,
shutdown: TaskClient,
) -> Self {
Listener {
address,
allowed_ingress,
shutdown,
}
}
async fn run(&mut self, connection_handler: ConnectionHandler) {
@@ -41,6 +51,12 @@ impl Listener {
connection = listener.accept() => {
match connection {
Ok((socket, remote_addr)) => {
if !self.allowed_ingress.is_allowed(remote_addr.ip()) {
// TODO: perhaps this should get lowered in severity?
warn!("received an incoming connection from {remote_addr}, but this address does not belong to any node on the previous layer - dropping the connection");
continue
}
let handler = connection_handler.clone();
tokio::spawn(handler.handle_connection(socket, remote_addr, self.shutdown.clone()));
}
+49 -8
View File
@@ -15,7 +15,9 @@ use crate::node::packet_delayforwarder::{DelayForwarder, PacketDelayForwardSende
use log::{error, info, warn};
use nym_bin_common::output_format::OutputFormat;
use nym_bin_common::version_checker::parse_version;
use nym_config::defaults::NymNetworkDetails;
use nym_crypto::asymmetric::{encryption, identity};
use nym_mixnode_common::forward_travel::{AllowedAddressesProvider, AllowedEgress, AllowedIngress};
use nym_mixnode_common::verloc::{self, AtomicVerlocResult, VerlocMeasurer};
use nym_task::{TaskClient, TaskManager};
use rand::seq::SliceRandom;
@@ -101,6 +103,8 @@ impl MixNode {
&self,
node_stats_update_sender: node_statistics::UpdateSender,
delay_forwarding_channel: PacketDelayForwardSender,
ingress: AllowedIngress,
egress: AllowedEgress,
shutdown: TaskClient,
) {
info!("Starting socket listener...");
@@ -108,14 +112,37 @@ impl MixNode {
let packet_processor =
PacketProcessor::new(self.sphinx_keypair.private_key(), node_stats_update_sender);
let connection_handler = ConnectionHandler::new(packet_processor, delay_forwarding_channel);
let connection_handler =
ConnectionHandler::new(egress, packet_processor, delay_forwarding_channel);
let listening_address = SocketAddr::new(
self.config.mixnode.listening_address,
self.config.mixnode.mix_port,
);
Listener::new(listening_address, shutdown).start(connection_handler);
Listener::new(listening_address, ingress, shutdown).start(connection_handler);
}
async fn start_allowed_addresses_provider(
&self,
task_client: TaskClient,
) -> Result<(AllowedIngress, AllowedEgress), MixnodeError> {
let identity = self.identity_keypair.public_key().to_base58_string();
let nyxd_endpoints = self.config.mixnode.nyxd_urls.clone();
let network = NymNetworkDetails::new_from_env();
let mut provider = AllowedAddressesProvider::new(
identity,
nyxd_endpoints,
!self.config.debug.enforce_forward_travel,
Some(network),
)
.await?;
let filters = (provider.ingress(), provider.egress());
tokio::spawn(async move { provider.run(task_client).await });
Ok(filters)
}
fn start_packet_delay_forwarder(
@@ -215,12 +242,16 @@ impl MixNode {
})
}
async fn wait_for_interrupt(&self, shutdown: TaskManager) {
let _res = shutdown.catch_interrupt().await;
log::info!("Stopping nym mixnode");
async fn wait_for_interrupt(
&self,
shutdown: TaskManager,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let res = shutdown.catch_interrupt().await;
log::info!("Stopping nym gateway");
res
}
pub async fn run(&mut self) -> Result<(), MixnodeError> {
pub(crate) async fn run(&mut self) -> Result<(), MixnodeError> {
info!("Starting nym mixnode");
if self.check_if_bonded().await {
@@ -229,6 +260,12 @@ impl MixNode {
let shutdown = TaskManager::default();
let (ingress, egress) = self
.start_allowed_addresses_provider(
shutdown.subscribe().named("AllowedAddressesProvider"),
)
.await?;
let (node_stats_pointer, node_stats_update_sender) = self
.start_node_stats_controller(shutdown.subscribe().named("node_statistics::Controller"));
let delay_forwarding_channel = self.start_packet_delay_forwarder(
@@ -238,6 +275,8 @@ impl MixNode {
self.start_socket_listener(
node_stats_update_sender,
delay_forwarding_channel,
ingress,
egress,
shutdown.subscribe().named("Listener"),
);
let atomic_verloc_results =
@@ -253,7 +292,9 @@ impl MixNode {
)?;
info!("Finished nym mixnode startup procedure - it should now be able to receive mix traffic!");
self.wait_for_interrupt(shutdown).await;
Ok(())
self.wait_for_interrupt(shutdown)
.await
.map_err(MixnodeError::shutdown_failure)
}
}
+2 -2
View File
@@ -38,13 +38,13 @@ serde_json = { workspace = true }
tap = "1.0"
thiserror = { workspace = true }
time = { workspace = true, features = ["serde-human-readable", "parsing"] }
tokio = { version = "1.24.1", features = [
tokio = { workspace = true, features = [
"rt-multi-thread",
"macros",
"signal",
"time",
] }
tokio-stream = "0.1.11"
tokio-stream = { workspace = true }
url = { workspace = true }
ts-rs = { workspace = true, optional = true}
+4 -4
View File
@@ -7448,9 +7448,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.31.0"
version = "1.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40de3a2ba249dcb097e01be5e67a5ff53cf250397715a071a81543e8a832a920"
checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653"
dependencies = [
"backtrace",
"bytes",
@@ -7557,9 +7557,9 @@ dependencies = [
[[package]]
name = "tokio-util"
version = "0.7.8"
version = "0.7.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d"
checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d"
dependencies = [
"bytes",
"futures-core",
+2 -2
View File
@@ -5797,9 +5797,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.31.0"
version = "1.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40de3a2ba249dcb097e01be5e67a5ff53cf250397715a071a81543e8a832a920"
checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653"
dependencies = [
"backtrace",
"bytes",
+1
View File
@@ -8,6 +8,7 @@ localnetdir="$HOME/.nym/localnets/localnet.$suffix"
mkdir -p "$localnetdir"
echo "Using $localnetdir for the localnet"
cargo build --release
# initialise mixnet
echo "initialising mixnode1..."
+2 -2
View File
@@ -48,8 +48,8 @@ nym-bin-common = { path = "../../../common/bin-common" }
# extra dependencies for libp2p examples
libp2p = { git = "https://github.com/ChainSafe/rust-libp2p.git", rev = "e3440d25681df380c9f0f8cfdcfd5ecc0a4f2fb6", features = [ "identify", "macros", "ping", "tokio", "tcp", "dns", "websocket", "noise", "mplex", "yamux", "gossipsub" ]}
tokio-stream = "0.1.12"
tokio-util = { version = "0.7", features = ["codec"] }
tokio-stream = { workspace = true }
tokio-util = { workspace = true, features = ["codec"] }
parking_lot = "0.12"
hex = "0.4"