squashing before rebasing
This commit is contained in:
Generated
+146
-6
@@ -1275,7 +1275,7 @@ dependencies = [
|
||||
"openssl-probe",
|
||||
"openssl-sys",
|
||||
"schannel",
|
||||
"socket2",
|
||||
"socket2 0.4.9",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
@@ -2518,7 +2518,7 @@ dependencies = [
|
||||
"httpdate",
|
||||
"itoa",
|
||||
"pin-project-lite",
|
||||
"socket2",
|
||||
"socket2 0.4.9",
|
||||
"tokio",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
@@ -2537,7 +2537,7 @@ dependencies = [
|
||||
"http",
|
||||
"hyper",
|
||||
"hyper-rustls",
|
||||
"rustls-native-certs",
|
||||
"rustls-native-certs 0.5.0",
|
||||
"tokio",
|
||||
"tokio-rustls 0.22.0",
|
||||
"tower-service",
|
||||
@@ -2555,7 +2555,7 @@ dependencies = [
|
||||
"hyper",
|
||||
"log",
|
||||
"rustls 0.19.1",
|
||||
"rustls-native-certs",
|
||||
"rustls-native-certs 0.5.0",
|
||||
"tokio",
|
||||
"tokio-rustls 0.22.0",
|
||||
"webpki 0.21.4",
|
||||
@@ -3804,6 +3804,7 @@ dependencies = [
|
||||
"atty",
|
||||
"bip39",
|
||||
"bs58",
|
||||
"bytes",
|
||||
"clap 4.2.7",
|
||||
"colored",
|
||||
"dashmap 4.0.2",
|
||||
@@ -3831,7 +3832,10 @@ dependencies = [
|
||||
"nym-validator-client",
|
||||
"once_cell",
|
||||
"pretty_env_logger",
|
||||
"quinn",
|
||||
"rand 0.7.3",
|
||||
"rcgen",
|
||||
"rustls 0.21.2",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sqlx 0.5.13",
|
||||
@@ -3919,10 +3923,13 @@ dependencies = [
|
||||
name = "nym-mixnet-client"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures",
|
||||
"log",
|
||||
"nym-sphinx",
|
||||
"nym-task",
|
||||
"quinn",
|
||||
"rustls 0.21.2",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
]
|
||||
@@ -3952,6 +3959,7 @@ version = "1.1.23"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bs58",
|
||||
"bytes",
|
||||
"cfg-if",
|
||||
"clap 4.2.7",
|
||||
"colored",
|
||||
@@ -3979,8 +3987,11 @@ dependencies = [
|
||||
"nym-validator-client",
|
||||
"opentelemetry",
|
||||
"pretty_env_logger",
|
||||
"quinn",
|
||||
"rand 0.7.3",
|
||||
"rcgen",
|
||||
"rocket",
|
||||
"rustls 0.21.2",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sysinfo",
|
||||
@@ -4197,7 +4208,7 @@ dependencies = [
|
||||
name = "nym-pemstore"
|
||||
version = "0.3.0"
|
||||
dependencies = [
|
||||
"pem",
|
||||
"pem 0.8.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5060,6 +5071,16 @@ dependencies = [
|
||||
"regex",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pem"
|
||||
version = "2.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6b13fe415cdf3c8e44518e18a7c95a13431d9bdf6d15367d82b23c377fdd441a"
|
||||
dependencies = [
|
||||
"base64 0.21.2",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "percent-encoding"
|
||||
version = "2.2.0"
|
||||
@@ -5399,6 +5420,54 @@ version = "2.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3"
|
||||
|
||||
[[package]]
|
||||
name = "quinn"
|
||||
version = "0.10.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "21252f1c0fc131f1b69182db8f34837e8a69737b8251dff75636a9be0518c324"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"pin-project-lite",
|
||||
"quinn-proto",
|
||||
"quinn-udp",
|
||||
"rustc-hash",
|
||||
"rustls 0.21.2",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quinn-proto"
|
||||
version = "0.10.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "85af4ed6ee5a89f26a26086e9089a6643650544c025158449a3626ebf72884b3"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"rand 0.8.5",
|
||||
"ring",
|
||||
"rustc-hash",
|
||||
"rustls 0.21.2",
|
||||
"rustls-native-certs 0.6.3",
|
||||
"slab",
|
||||
"thiserror",
|
||||
"tinyvec",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quinn-udp"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6df19e284d93757a9fb91d63672f7741b129246a669db09d1c0063071debc0c0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"libc",
|
||||
"socket2 0.5.3",
|
||||
"tracing",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quote"
|
||||
version = "1.0.27"
|
||||
@@ -5641,6 +5710,18 @@ dependencies = [
|
||||
"num_cpus",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rcgen"
|
||||
version = "0.11.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4954fbc00dcd4d8282c987710e50ba513d351400dbdd00e803a05172a90d8976"
|
||||
dependencies = [
|
||||
"pem 2.0.1",
|
||||
"ring",
|
||||
"time 0.3.21",
|
||||
"yasna",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rdrand"
|
||||
version = "0.4.0"
|
||||
@@ -5932,6 +6013,12 @@ dependencies = [
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustc-hash"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
|
||||
|
||||
[[package]]
|
||||
name = "rustc_version"
|
||||
version = "0.2.3"
|
||||
@@ -5989,6 +6076,18 @@ dependencies = [
|
||||
"webpki 0.22.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.21.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e32ca28af694bc1bbf399c33a516dbdf1c90090b8ab23c2bc24f834aa2247f5f"
|
||||
dependencies = [
|
||||
"log",
|
||||
"ring",
|
||||
"rustls-webpki",
|
||||
"sct 0.7.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-native-certs"
|
||||
version = "0.5.0"
|
||||
@@ -6001,6 +6100,18 @@ dependencies = [
|
||||
"security-framework",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-native-certs"
|
||||
version = "0.6.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00"
|
||||
dependencies = [
|
||||
"openssl-probe",
|
||||
"rustls-pemfile",
|
||||
"schannel",
|
||||
"security-framework",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-pemfile"
|
||||
version = "1.0.2"
|
||||
@@ -6010,6 +6121,16 @@ dependencies = [
|
||||
"base64 0.21.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-webpki"
|
||||
version = "0.100.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d6207cd5ed3d8dca7816f8f3725513a34609c0c765bf652b8c3cb4cfd87db46b"
|
||||
dependencies = [
|
||||
"ring",
|
||||
"untrusted",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustversion"
|
||||
version = "1.0.12"
|
||||
@@ -6483,6 +6604,16 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "socket2"
|
||||
version = "0.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sphinx-packet"
|
||||
version = "0.1.0"
|
||||
@@ -7132,7 +7263,7 @@ dependencies = [
|
||||
"parking_lot 0.12.1",
|
||||
"pin-project-lite",
|
||||
"signal-hook-registry",
|
||||
"socket2",
|
||||
"socket2 0.4.9",
|
||||
"tokio-macros",
|
||||
"tracing",
|
||||
"windows-sys 0.48.0",
|
||||
@@ -8215,6 +8346,15 @@ version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec"
|
||||
|
||||
[[package]]
|
||||
name = "yasna"
|
||||
version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e17bb3549cc1321ae1296b9cdc2698e2b6cb1992adfa19a8c72e5b7a738f44cd"
|
||||
dependencies = [
|
||||
"time 0.3.21",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zeroize"
|
||||
version = "1.6.0"
|
||||
|
||||
@@ -40,6 +40,8 @@ pub fn setup_logging() {
|
||||
.filter_module("tokio_tungstenite", log::LevelFilter::Warn)
|
||||
.filter_module("handlebars", log::LevelFilter::Warn)
|
||||
.filter_module("sled", log::LevelFilter::Warn)
|
||||
.filter_module("quinn", log::LevelFilter::Warn)
|
||||
.filter_module("rustls", log::LevelFilter::Warn)
|
||||
.init();
|
||||
}
|
||||
|
||||
|
||||
@@ -11,6 +11,9 @@ futures = "0.3"
|
||||
log = { workspace = true }
|
||||
tokio = { version = "1.24.1", features = ["time", "net", "rt"] }
|
||||
tokio-util = { version = "0.7.4", features = ["codec"] }
|
||||
rustls = {version = "0.21.1", features = ["dangerous_configuration", "quic"]}
|
||||
quinn = "0.10.1"
|
||||
bytes = "*"
|
||||
|
||||
# internal
|
||||
nym-sphinx = { path = "../../nymsphinx" }
|
||||
|
||||
@@ -2,22 +2,23 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use futures::channel::mpsc;
|
||||
use futures::StreamExt;
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use log::*;
|
||||
use nym_sphinx::addressing::nodes::NymNodeRoutingAddress;
|
||||
use nym_sphinx::framing::codec::NymCodec;
|
||||
use nym_sphinx::framing::packet::FramedNymPacket;
|
||||
use nym_sphinx::params::PacketType;
|
||||
use nym_sphinx::NymPacket;
|
||||
use quinn::{ClientConfig, Endpoint};
|
||||
use std::collections::HashMap;
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::net::TcpStream;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use tokio::time::sleep;
|
||||
use tokio_util::codec::Framed;
|
||||
use tokio_util::codec::FramedWrite;
|
||||
|
||||
pub struct Config {
|
||||
initial_reconnection_backoff: Duration,
|
||||
@@ -64,6 +65,7 @@ pub struct Client {
|
||||
struct ConnectionSender {
|
||||
channel: mpsc::Sender<FramedNymPacket>,
|
||||
current_reconnection_attempt: Arc<AtomicU32>,
|
||||
last_used: Instant,
|
||||
}
|
||||
|
||||
impl ConnectionSender {
|
||||
@@ -71,6 +73,7 @@ impl ConnectionSender {
|
||||
ConnectionSender {
|
||||
channel,
|
||||
current_reconnection_attempt: Arc::new(AtomicU32::new(0)),
|
||||
last_used: Instant::now(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -85,19 +88,25 @@ impl Client {
|
||||
|
||||
async fn manage_connection(
|
||||
address: SocketAddr,
|
||||
receiver: mpsc::Receiver<FramedNymPacket>,
|
||||
mut receiver: mpsc::Receiver<FramedNymPacket>,
|
||||
connection_timeout: Duration,
|
||||
current_reconnection: &AtomicU32,
|
||||
) {
|
||||
let connection_fut = TcpStream::connect(address);
|
||||
let mut endpoint = Endpoint::client("0.0.0.0:0".parse::<SocketAddr>().unwrap()).unwrap();
|
||||
endpoint.set_default_client_config(configure_client());
|
||||
|
||||
let conn = match tokio::time::timeout(connection_timeout, connection_fut).await {
|
||||
let conn = match tokio::time::timeout(
|
||||
connection_timeout,
|
||||
endpoint.connect(address, "mixnode").unwrap(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(stream_res) => match stream_res {
|
||||
Ok(stream) => {
|
||||
Ok(connection) => {
|
||||
debug!("Managed to establish connection to {}", address);
|
||||
// if we managed to connect, reset the reconnection count (whatever it might have been)
|
||||
current_reconnection.store(0, Ordering::Release);
|
||||
Framed::new(stream, NymCodec)
|
||||
connection
|
||||
}
|
||||
Err(err) => {
|
||||
debug!(
|
||||
@@ -118,18 +127,26 @@ impl Client {
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Take whatever the receiver channel produces and put it on the connection.
|
||||
// We could have as well used conn.send_all(receiver.map(Ok)), but considering we don't care
|
||||
// about neither receiver nor the connection, it doesn't matter which one gets consumed
|
||||
if let Err(err) = receiver.map(Ok).forward(conn).await {
|
||||
warn!("Failed to forward packets to {} - {err}", address);
|
||||
loop {
|
||||
let pkt = match receiver.next().await {
|
||||
Some(pkt) => pkt,
|
||||
None => {
|
||||
debug!("No more packet to send to {}", address);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let send = match conn.open_uni().await {
|
||||
Ok(send_stream) => send_stream,
|
||||
Err(err) => {
|
||||
error!("Failed to open uni stream, dropping packet - {err:?}");
|
||||
return; //We shouldn't get a time out here, it should be handled higher
|
||||
}
|
||||
};
|
||||
let mut framed_stream = FramedWrite::new(send, NymCodec);
|
||||
if let Err(err) = framed_stream.send(pkt).await {
|
||||
warn!("Failed to forward packets to {} - {err}", address);
|
||||
}
|
||||
}
|
||||
|
||||
debug!(
|
||||
"connection manager to {} is finished. Either the connection failed or mixnet client got dropped",
|
||||
address
|
||||
);
|
||||
}
|
||||
|
||||
/// If we're trying to reconnect, determine how long we should wait.
|
||||
@@ -205,36 +222,53 @@ impl SendWithoutResponse for Client {
|
||||
FramedNymPacket::new(packet, packet_type, self.config.use_legacy_version);
|
||||
|
||||
if let Some(sender) = self.conn_new.get_mut(&address) {
|
||||
if let Err(err) = sender.channel.try_send(framed_packet) {
|
||||
if err.is_full() {
|
||||
debug!("Connection to {} seems to not be able to handle all the traffic - dropping the current packet", address);
|
||||
// it's not a 'big' error, but we did not manage to send the packet
|
||||
// if the queue is full, we can't really do anything but to drop the packet
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::WouldBlock,
|
||||
"connection queue is full",
|
||||
))
|
||||
} else if err.is_disconnected() {
|
||||
debug!(
|
||||
"Connection to {} seems to be dead. attempting to re-establish it...",
|
||||
address
|
||||
);
|
||||
// it's not a 'big' error, but we did not manage to send the packet, but queue
|
||||
// it up to send it as soon as the connection is re-established
|
||||
self.make_connection(address, err.into_inner());
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::ConnectionAborted,
|
||||
"reconnection attempt is in progress",
|
||||
))
|
||||
} else {
|
||||
// this can't really happen, but let's safe-guard against it in case something changes in futures library
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"unknown connection buffer error",
|
||||
))
|
||||
}
|
||||
if sender.last_used.elapsed().as_millis() > 9_000 {
|
||||
// default timeout is 10sec, let's take some margin for the operation to run.
|
||||
//connection is near timeout, let's just recreate one
|
||||
sender.channel.close_channel();
|
||||
self.conn_new.remove(&address);
|
||||
debug!(
|
||||
"connection near or past timemout. Reconnecting to {}",
|
||||
address
|
||||
);
|
||||
self.make_connection(address, framed_packet);
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::NotConnected,
|
||||
"re-connection is in progress",
|
||||
))
|
||||
} else {
|
||||
Ok(())
|
||||
sender.last_used = Instant::now();
|
||||
if let Err(err) = sender.channel.try_send(framed_packet) {
|
||||
if err.is_full() {
|
||||
debug!("Connection to {} seems to not be able to handle all the traffic - dropping the current packet", address);
|
||||
// it's not a 'big' error, but we did not manage to send the packet
|
||||
// if the queue is full, we can't really do anything but to drop the packet
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::WouldBlock,
|
||||
"connection queue is full",
|
||||
))
|
||||
} else if err.is_disconnected() {
|
||||
debug!(
|
||||
"Connection to {} seems to be dead. attempting to re-establish it...",
|
||||
address
|
||||
);
|
||||
// it's not a 'big' error, but we did not manage to send the packet, but queue
|
||||
// it up to send it as soon as the connection is re-established
|
||||
self.make_connection(address, err.into_inner());
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::ConnectionAborted,
|
||||
"reconnection attempt is in progress",
|
||||
))
|
||||
} else {
|
||||
// this can't really happen, but let's safe-guard against it in case something changes in futures library
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"unknown connection buffer error",
|
||||
))
|
||||
}
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// there was never a connection to begin with
|
||||
@@ -292,3 +326,35 @@ mod tests {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Implementation of `ServerCertVerifier` that verifies everything as trustworthy.
|
||||
struct SkipServerVerification;
|
||||
|
||||
impl SkipServerVerification {
|
||||
fn new() -> Arc<Self> {
|
||||
Arc::new(Self)
|
||||
}
|
||||
}
|
||||
|
||||
impl rustls::client::ServerCertVerifier for SkipServerVerification {
|
||||
fn verify_server_cert(
|
||||
&self,
|
||||
_end_entity: &rustls::Certificate,
|
||||
_intermediates: &[rustls::Certificate],
|
||||
_server_name: &rustls::ServerName,
|
||||
_scts: &mut dyn Iterator<Item = &[u8]>,
|
||||
_ocsp_response: &[u8],
|
||||
_now: std::time::SystemTime,
|
||||
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
|
||||
Ok(rustls::client::ServerCertVerified::assertion())
|
||||
}
|
||||
}
|
||||
|
||||
fn configure_client() -> ClientConfig {
|
||||
let crypto = rustls::ClientConfig::builder()
|
||||
.with_safe_defaults()
|
||||
.with_custom_certificate_verifier(SkipServerVerification::new())
|
||||
.with_no_client_auth();
|
||||
|
||||
ClientConfig::new(Arc::new(crypto))
|
||||
}
|
||||
|
||||
@@ -43,6 +43,10 @@ tokio-tungstenite = "0.14"
|
||||
tokio-util = { version = "0.7.4", features = ["codec"] }
|
||||
url = { version = "2.2", features = ["serde"] }
|
||||
zeroize = { workspace = true }
|
||||
quinn = "0.10.1"
|
||||
rustls = {version = "0.21.1", features = ["quic"]}
|
||||
rcgen = "*"
|
||||
bytes = "*"
|
||||
|
||||
# internal
|
||||
nym-api-requests = { path = "../nym-api/nym-api-requests" }
|
||||
|
||||
@@ -15,10 +15,9 @@ use nym_sphinx::framing::codec::NymCodec;
|
||||
use nym_sphinx::framing::packet::FramedNymPacket;
|
||||
use nym_sphinx::DestinationAddressBytes;
|
||||
use nym_task::TaskClient;
|
||||
use quinn::{Connection, ConnectionError};
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_util::codec::Framed;
|
||||
use tokio_util::codec::FramedRead;
|
||||
|
||||
pub(crate) struct ConnectionHandler<St: Storage> {
|
||||
packet_processor: PacketProcessor,
|
||||
@@ -174,23 +173,37 @@ impl<St: Storage> ConnectionHandler<St> {
|
||||
self.handle_processed_packet(processed_final_hop).await
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_connection(
|
||||
mut self,
|
||||
conn: TcpStream,
|
||||
remote: SocketAddr,
|
||||
mut shutdown: TaskClient,
|
||||
) {
|
||||
debug!("Starting connection handler for {:?}", remote);
|
||||
pub(crate) async fn handle_connection(mut self, conn: Connection, mut shutdown: TaskClient) {
|
||||
debug!(
|
||||
"Starting connection handler for {:?}",
|
||||
conn.remote_address()
|
||||
);
|
||||
shutdown.mark_as_success();
|
||||
let mut framed_conn = Framed::new(conn, NymCodec);
|
||||
while !shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = shutdown.recv() => {
|
||||
log::trace!("ConnectionHandler: received shutdown");
|
||||
}
|
||||
framed_sphinx_packet = framed_conn.next() => {
|
||||
match framed_sphinx_packet {
|
||||
recv = conn.accept_uni() => {
|
||||
let recv_stream = match recv {
|
||||
Ok(recv_stream) => recv_stream,
|
||||
Err(err) => {
|
||||
match err {
|
||||
ConnectionError::TimedOut => {
|
||||
//normal timeout, we just need to drop the connection
|
||||
break;
|
||||
},
|
||||
_ => {
|
||||
error!("Error accepting uni stream - {err:?}");
|
||||
break;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
let mut framed_stream = FramedRead::new(recv_stream, NymCodec);
|
||||
match framed_stream.next().await {
|
||||
Some(Ok(framed_sphinx_packet)) => {
|
||||
// TODO: benchmark spawning tokio task with full processing vs just processing it
|
||||
// synchronously under higher load in single and multi-threaded situation.
|
||||
@@ -212,9 +225,6 @@ impl<St: Storage> ConnectionHandler<St> {
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
"Closing connection from {:?}",
|
||||
framed_conn.into_inner().peer_addr()
|
||||
);
|
||||
info!("Closing connection from {:?}", conn.remote_address());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,8 +5,10 @@ use crate::node::mixnet_handling::receiver::connection_handler::ConnectionHandle
|
||||
use crate::node::storage::Storage;
|
||||
use log::*;
|
||||
use nym_task::TaskClient;
|
||||
use quinn::{Endpoint, ServerConfig};
|
||||
use rcgen::generate_simple_self_signed;
|
||||
use rustls::{Certificate, PrivateKey};
|
||||
use std::net::SocketAddr;
|
||||
use std::process;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
pub(crate) struct Listener {
|
||||
@@ -25,13 +27,7 @@ impl Listener {
|
||||
St: Storage + Clone + 'static,
|
||||
{
|
||||
info!("Starting mixnet listener at {}", self.address);
|
||||
let tcp_listener = match tokio::net::TcpListener::bind(self.address).await {
|
||||
Ok(listener) => listener,
|
||||
Err(err) => {
|
||||
error!("Failed to bind to {} - {err}. Are you sure nothing else is running on the specified port and your user has sufficient permission to bind to the requested address?", self.address);
|
||||
process::exit(1);
|
||||
}
|
||||
};
|
||||
let endpoint = Endpoint::server(server_config(), self.address).unwrap();
|
||||
|
||||
while !self.shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
@@ -39,13 +35,22 @@ impl Listener {
|
||||
_ = self.shutdown.recv() => {
|
||||
log::trace!("mixnet_handling::Listener: Received shutdown");
|
||||
}
|
||||
connection = tcp_listener.accept() => {
|
||||
connection = endpoint.accept() => {
|
||||
match connection {
|
||||
Ok((socket, remote_addr)) => {
|
||||
let handler = connection_handler.clone();
|
||||
tokio::spawn(handler.handle_connection(socket, remote_addr, self.shutdown.clone()));
|
||||
Some(connecting) => {
|
||||
match connecting.await {
|
||||
Ok(conn) => {
|
||||
debug!("Handling connection from {:?}", conn.remote_address());
|
||||
let handler = connection_handler.clone();
|
||||
tokio::spawn(handler.handle_connection(conn, self.shutdown.clone()));
|
||||
},
|
||||
Err(err) => error!("Failed to establish connection - {err:?}"),
|
||||
}
|
||||
}
|
||||
Err(err) => warn!("failed to get client: {err}"),
|
||||
None => {
|
||||
error!("Endpoint closed");
|
||||
break;
|
||||
}, // stream got closed by remote
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -61,3 +66,13 @@ impl Listener {
|
||||
tokio::spawn(async move { self.run(connection_handler).await })
|
||||
}
|
||||
}
|
||||
|
||||
fn generate_self_signed_cert() -> Result<(Certificate, PrivateKey), Box<dyn std::error::Error>> {
|
||||
let cert = generate_simple_self_signed(vec!["mixnode".to_string()])?;
|
||||
let key = PrivateKey(cert.serialize_private_key_der());
|
||||
Ok((Certificate(cert.serialize_der()?), key))
|
||||
}
|
||||
fn server_config() -> ServerConfig {
|
||||
let (cert, key) = generate_self_signed_cert().expect("Failed to generate certificate");
|
||||
ServerConfig::with_single_cert(vec![cert], key).expect("Failed to generate server config")
|
||||
}
|
||||
|
||||
+5
-1
@@ -33,10 +33,14 @@ serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
sysinfo = "0.27.7"
|
||||
tokio = { version = "1.21.2", features = ["rt-multi-thread", "net", "signal"] }
|
||||
tokio-util = { version = "0.7.3", features = ["codec"] }
|
||||
tokio-util = { version = "0.7.3", features = ["codec", "net"] }
|
||||
toml = "0.5.8"
|
||||
url = { version = "2.2", features = ["serde"] }
|
||||
cfg-if = "1.0.0"
|
||||
quinn = "0.10.1"
|
||||
rustls = {version = "0.21.1", features = ["quic"]}
|
||||
rcgen = "*"
|
||||
bytes = "*"
|
||||
|
||||
## tracing
|
||||
tracing = { version = "0.1.37", optional = true }
|
||||
|
||||
@@ -12,10 +12,10 @@ use nym_sphinx::forwarding::packet::MixPacket;
|
||||
use nym_sphinx::framing::codec::NymCodec;
|
||||
use nym_sphinx::framing::packet::FramedNymPacket;
|
||||
use nym_sphinx::Delay as SphinxDelay;
|
||||
use std::net::SocketAddr;
|
||||
use tokio::net::TcpStream;
|
||||
use quinn::{Connection, ConnectionError};
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::codec::Framed;
|
||||
use tokio_util::codec::FramedRead;
|
||||
|
||||
#[cfg(feature = "cpucycles")]
|
||||
use tracing::{error, info, instrument};
|
||||
|
||||
@@ -54,7 +54,7 @@ impl ConnectionHandler {
|
||||
feature = "cpucycles",
|
||||
instrument(skip(self, framed_sphinx_packet), fields(cpucycles))
|
||||
)]
|
||||
fn handle_received_packet(&self, framed_sphinx_packet: FramedNymPacket) {
|
||||
async fn handle_received_packet(&self, framed_sphinx_packet: FramedNymPacket) {
|
||||
//
|
||||
// TODO: here be replay attack detection - it will require similar key cache to the one in
|
||||
// packet processor for vpn packets,
|
||||
@@ -78,37 +78,51 @@ impl ConnectionHandler {
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_connection(
|
||||
self,
|
||||
conn: TcpStream,
|
||||
remote: SocketAddr,
|
||||
mut shutdown: TaskClient,
|
||||
) {
|
||||
debug!("Starting connection handler for {:?}", remote);
|
||||
pub(crate) async fn handle_connection(self, conn: Connection, mut shutdown: TaskClient) {
|
||||
debug!(
|
||||
"Starting connection handler for {:?}",
|
||||
conn.remote_address()
|
||||
);
|
||||
shutdown.mark_as_success();
|
||||
let mut framed_conn = Framed::new(conn, NymCodec);
|
||||
while !shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = shutdown.recv() => {
|
||||
log::trace!("ConnectionHandler: received shutdown");
|
||||
}
|
||||
framed_sphinx_packet = framed_conn.next() => {
|
||||
match framed_sphinx_packet {
|
||||
|
||||
recv = conn.accept_uni() => {
|
||||
let recv_stream = match recv {
|
||||
Ok(recv_stream) => recv_stream,
|
||||
Err(err) => {
|
||||
match err {
|
||||
ConnectionError::TimedOut => {
|
||||
//normal timeout, we just need to drop the connection
|
||||
break;
|
||||
},
|
||||
_ => {
|
||||
error!("Error accepting uni stream - {err:?}");
|
||||
break;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
let mut framed_stream = FramedRead::new(recv_stream, NymCodec);
|
||||
match framed_stream.next().await {
|
||||
Some(Ok(framed_sphinx_packet)) => {
|
||||
// TODO: benchmark spawning tokio task with full processing vs just processing it
|
||||
// synchronously (without delaying inside of course,
|
||||
// delay is moved to a global DelayQueue)
|
||||
// under higher load in single and multi-threaded situation.
|
||||
// synchronously under higher load in single and multi-threaded situation.
|
||||
|
||||
// in theory we could process multiple sphinx packet from the same connection in parallel,
|
||||
// but we already handle multiple concurrent connections so if anything, making
|
||||
// that change would only slow things down
|
||||
self.handle_received_packet(framed_sphinx_packet);
|
||||
self.handle_received_packet(framed_sphinx_packet).await;
|
||||
}
|
||||
Some(Err(err)) => {
|
||||
error!(
|
||||
"{remote:?} - The socket connection got corrupted with error: {err}. Closing the socket",
|
||||
"The socket connection got corrupted with error: {err}. Closing the socket",
|
||||
);
|
||||
return;
|
||||
}
|
||||
@@ -118,10 +132,7 @@ impl ConnectionHandler {
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
"Closing connection from {:?}",
|
||||
framed_conn.into_inner().peer_addr()
|
||||
);
|
||||
info!("Closing connection from {:?}", conn.remote_address());
|
||||
log::trace!("ConnectionHandler: Exiting");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,9 +2,10 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::node::listener::connection_handler::ConnectionHandler;
|
||||
use quinn::{Endpoint, ServerConfig};
|
||||
use rcgen::generate_simple_self_signed;
|
||||
use rustls::{Certificate, PrivateKey};
|
||||
use std::net::SocketAddr;
|
||||
use std::process;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::task::JoinHandle;
|
||||
#[cfg(feature = "cpucycles")]
|
||||
use tracing::error;
|
||||
@@ -25,27 +26,36 @@ impl Listener {
|
||||
|
||||
async fn run(&mut self, connection_handler: ConnectionHandler) {
|
||||
log::trace!("Starting Listener");
|
||||
let listener = match TcpListener::bind(self.address).await {
|
||||
Ok(listener) => listener,
|
||||
Err(err) => {
|
||||
error!("Failed to bind to {} - {err}. Are you sure nothing else is running on the specified port and your user has sufficient permission to bind to the requested address?", self.address);
|
||||
process::exit(1);
|
||||
}
|
||||
};
|
||||
let endpoint = Endpoint::server(server_config(), self.address).unwrap();
|
||||
|
||||
while !self.shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = self.shutdown.recv() => {
|
||||
log::trace!("Listener: Received shutdown");
|
||||
}
|
||||
connection = listener.accept() => {
|
||||
},
|
||||
connection = endpoint.accept() => {
|
||||
match connection {
|
||||
Ok((socket, remote_addr)) => {
|
||||
let handler = connection_handler.clone();
|
||||
tokio::spawn(handler.handle_connection(socket, remote_addr, self.shutdown.clone()));
|
||||
Some(connecting) => {
|
||||
match connecting.await {
|
||||
Ok(conn) => {
|
||||
debug!("Handling connection from {:?}", conn.remote_address());
|
||||
let handler = connection_handler.clone();
|
||||
tokio::spawn(handler.handle_connection(conn, self.shutdown.clone()));
|
||||
},
|
||||
Err(err) => error!("Failed to establish connection - {err:?}"),
|
||||
}
|
||||
}
|
||||
Err(err) => warn!("Failed to accept incoming connection - {err}"),
|
||||
// Some(Err(err)) => {
|
||||
// error!(
|
||||
// "The socket connection got corrupted with error: {err}. Closing the socket",
|
||||
// );
|
||||
// return;
|
||||
// }
|
||||
None => {
|
||||
error!("Endpoint closed");
|
||||
break;
|
||||
}, // stream got closed by remote
|
||||
}
|
||||
},
|
||||
};
|
||||
@@ -59,3 +69,14 @@ impl Listener {
|
||||
tokio::spawn(async move { self.run(connection_handler).await })
|
||||
}
|
||||
}
|
||||
|
||||
fn generate_self_signed_cert(
|
||||
) -> Result<(rustls::Certificate, rustls::PrivateKey), Box<dyn std::error::Error>> {
|
||||
let cert = generate_simple_self_signed(vec!["mixnode".to_string()])?;
|
||||
let key = PrivateKey(cert.serialize_private_key_der());
|
||||
Ok((Certificate(cert.serialize_der()?), key))
|
||||
}
|
||||
fn server_config() -> ServerConfig {
|
||||
let (cert, key) = generate_self_signed_cert().expect("Failed to generate certificate");
|
||||
ServerConfig::with_single_cert(vec![cert], key).expect("Failed to generate server config")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user