d8c84cc4d6
* wip * wip: wrap node's sphinx key with a manager * wip: choosing correct key for packet processing * further propagation of key rotation information * attaching key rotation information to reply surbs * added basic key rotation information to mixnet contract * wip: introducing cached queries for key rotation info from nym api * unified nym-api contract cache refreshing * finish packet decoding * multi api client + retrieving rotation id * rotating sphinx key files * logic for migrating config file * wip: putting new sphinx keys to self described endpoints * processing loop of KeyRotationController * fixed sphinx key loading * rotating bloomfilters * wired up KeyRotationController * flushing bloomfilters to disk and loading * most of nym-node changes * post rebase fixes * fixes due to backwards compatible hostkeys * split http state.rs file * dont use deprecated fields * fixed backwards compatible deserialisation of host information * split up node describe cache * added a dedicated CacheRefresher listener to perform full refresh outside the set interval * controlling announced sphinx keys within nym-api * retrieving rotation id when pulling topology * split nym-nodes http handlers * v2 nym-api endpoints to retrieve nodes with additional metadata information * bug fixes... * additional bugfixes and guards against stuck epoch * testnet manager: set first nym-api as the rewarder * fixed host information deserialisation * fixed panic during first key rotation * post rebase fixes * clippy * more guards against stuck epochs * added helper method to reset node's sphinx key * instantiate mixnet contract with custom key rotation validity * additional bugfixes and debugging nym-api deadlock * passing shutdown to nym apis client * remove dead test * post rebasing fixes * missing MixnetQueryClient variants * remove usage of deprecated methods in sdk example * fix: incorrect method signature * post rebasing fixes * attempt to retrieve key rotation id before doing any config migration work * ignore tests relying on networking behaviour * allow networking failures in certain tests
335 lines
12 KiB
Rust
335 lines
12 KiB
Rust
// Copyright 2021-2024 - Nym Technologies SA <contact@nymtech.net>
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
use dashmap::DashMap;
|
|
use futures::StreamExt;
|
|
use nym_sphinx::forwarding::packet::MixPacket;
|
|
use nym_sphinx::framing::codec::NymCodec;
|
|
use nym_sphinx::framing::packet::FramedNymPacket;
|
|
use std::io;
|
|
use std::net::SocketAddr;
|
|
use std::ops::Deref;
|
|
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
use tokio::net::TcpStream;
|
|
use tokio::sync::mpsc;
|
|
use tokio::sync::mpsc::error::TrySendError;
|
|
use tokio::time::sleep;
|
|
use tokio_stream::wrappers::ReceiverStream;
|
|
use tokio_util::codec::Framed;
|
|
use tracing::*;
|
|
|
|
#[derive(Clone, Copy)]
|
|
pub struct Config {
|
|
pub initial_reconnection_backoff: Duration,
|
|
pub maximum_reconnection_backoff: Duration,
|
|
pub initial_connection_timeout: Duration,
|
|
pub maximum_connection_buffer_size: usize,
|
|
}
|
|
|
|
impl Config {
|
|
pub fn new(
|
|
initial_reconnection_backoff: Duration,
|
|
maximum_reconnection_backoff: Duration,
|
|
initial_connection_timeout: Duration,
|
|
maximum_connection_buffer_size: usize,
|
|
) -> Self {
|
|
Config {
|
|
initial_reconnection_backoff,
|
|
maximum_reconnection_backoff,
|
|
initial_connection_timeout,
|
|
maximum_connection_buffer_size,
|
|
}
|
|
}
|
|
}
|
|
|
|
pub trait SendWithoutResponse {
|
|
// Without response in this context means we will not listen for anything we might get back (not
|
|
// that we should get anything), including any possible io errors
|
|
fn send_without_response(&self, packet: MixPacket) -> io::Result<()>;
|
|
}
|
|
|
|
pub struct Client {
|
|
active_connections: ActiveConnections,
|
|
connections_count: Arc<AtomicUsize>,
|
|
config: Config,
|
|
}
|
|
|
|
#[derive(Default, Clone)]
|
|
pub struct ActiveConnections {
|
|
inner: Arc<DashMap<SocketAddr, ConnectionSender>>,
|
|
}
|
|
|
|
impl ActiveConnections {
|
|
pub fn pending_packets(&self) -> usize {
|
|
self.inner
|
|
.iter()
|
|
.map(|sender| {
|
|
let max_capacity = sender.channel.max_capacity();
|
|
let capacity = sender.channel.capacity();
|
|
max_capacity - capacity
|
|
})
|
|
.sum()
|
|
}
|
|
}
|
|
|
|
impl Deref for ActiveConnections {
|
|
type Target = DashMap<SocketAddr, ConnectionSender>;
|
|
fn deref(&self) -> &Self::Target {
|
|
&self.inner
|
|
}
|
|
}
|
|
|
|
pub struct ConnectionSender {
|
|
channel: mpsc::Sender<FramedNymPacket>,
|
|
current_reconnection_attempt: Arc<AtomicU32>,
|
|
}
|
|
|
|
impl ConnectionSender {
|
|
fn new(channel: mpsc::Sender<FramedNymPacket>) -> Self {
|
|
ConnectionSender {
|
|
channel,
|
|
current_reconnection_attempt: Arc::new(AtomicU32::new(0)),
|
|
}
|
|
}
|
|
}
|
|
|
|
struct ManagedConnection {
|
|
address: SocketAddr,
|
|
message_receiver: ReceiverStream<FramedNymPacket>,
|
|
connection_timeout: Duration,
|
|
current_reconnection: Arc<AtomicU32>,
|
|
}
|
|
|
|
impl ManagedConnection {
|
|
fn new(
|
|
address: SocketAddr,
|
|
message_receiver: mpsc::Receiver<FramedNymPacket>,
|
|
connection_timeout: Duration,
|
|
current_reconnection: Arc<AtomicU32>,
|
|
) -> Self {
|
|
ManagedConnection {
|
|
address,
|
|
message_receiver: ReceiverStream::new(message_receiver),
|
|
connection_timeout,
|
|
current_reconnection,
|
|
}
|
|
}
|
|
|
|
async fn run(self) {
|
|
let address = self.address;
|
|
let connection_fut = TcpStream::connect(address);
|
|
|
|
let conn = match tokio::time::timeout(self.connection_timeout, connection_fut).await {
|
|
Ok(stream_res) => match stream_res {
|
|
Ok(stream) => {
|
|
debug!("Managed to establish connection to {}", self.address);
|
|
// if we managed to connect, reset the reconnection count (whatever it might have been)
|
|
self.current_reconnection.store(0, Ordering::Release);
|
|
Framed::new(stream, NymCodec)
|
|
}
|
|
Err(err) => {
|
|
debug!("failed to establish connection to {address} (err: {err})",);
|
|
return;
|
|
}
|
|
},
|
|
Err(_) => {
|
|
debug!(
|
|
"failed to connect to {address} within {:?}",
|
|
self.connection_timeout
|
|
);
|
|
|
|
// we failed to connect - increase reconnection attempt
|
|
self.current_reconnection.fetch_add(1, Ordering::SeqCst);
|
|
return;
|
|
}
|
|
};
|
|
|
|
// Take whatever the receiver channel produces and put it on the connection.
|
|
// We could have as well used conn.send_all(receiver.map(Ok)), but considering we don't care
|
|
// about neither receiver nor the connection, it doesn't matter which one gets consumed
|
|
if let Err(err) = self.message_receiver.map(Ok).forward(conn).await {
|
|
warn!("Failed to forward packets to {address}: {err}");
|
|
}
|
|
|
|
debug!(
|
|
"connection manager to {address} is finished. Either the connection failed or mixnet client got dropped",
|
|
);
|
|
}
|
|
}
|
|
|
|
impl Client {
|
|
pub fn new(config: Config, connections_count: Arc<AtomicUsize>) -> Client {
|
|
Client {
|
|
active_connections: Default::default(),
|
|
connections_count,
|
|
config,
|
|
}
|
|
}
|
|
|
|
pub fn active_connections(&self) -> ActiveConnections {
|
|
self.active_connections.clone()
|
|
}
|
|
|
|
/// If we're trying to reconnect, determine how long we should wait.
|
|
fn determine_backoff(&self, current_attempt: u32) -> Option<Duration> {
|
|
if current_attempt == 0 {
|
|
None
|
|
} else {
|
|
let exp = 2_u32.checked_pow(current_attempt);
|
|
let backoff = exp
|
|
.and_then(|exp| self.config.initial_reconnection_backoff.checked_mul(exp))
|
|
.unwrap_or(self.config.maximum_reconnection_backoff);
|
|
|
|
Some(std::cmp::min(
|
|
backoff,
|
|
self.config.maximum_reconnection_backoff,
|
|
))
|
|
}
|
|
}
|
|
|
|
fn make_connection(&self, address: SocketAddr, pending_packet: FramedNymPacket) {
|
|
let (sender, receiver) = mpsc::channel(self.config.maximum_connection_buffer_size);
|
|
|
|
// this CAN'T fail because we just created the channel which has a non-zero capacity
|
|
if self.config.maximum_connection_buffer_size > 0 {
|
|
sender.try_send(pending_packet).unwrap();
|
|
}
|
|
|
|
// if we already tried to connect to `address` before, grab the current attempt count
|
|
let current_reconnection_attempt =
|
|
if let Some(mut existing) = self.active_connections.get_mut(&address) {
|
|
existing.channel = sender;
|
|
Arc::clone(&existing.current_reconnection_attempt)
|
|
} else {
|
|
let new_entry = ConnectionSender::new(sender);
|
|
let current_attempt = Arc::clone(&new_entry.current_reconnection_attempt);
|
|
self.active_connections.insert(address, new_entry);
|
|
current_attempt
|
|
};
|
|
|
|
// load the actual value.
|
|
let reconnection_attempt = current_reconnection_attempt.load(Ordering::Acquire);
|
|
let backoff = self.determine_backoff(reconnection_attempt);
|
|
|
|
// copy the value before moving into another task
|
|
let initial_connection_timeout = self.config.initial_connection_timeout;
|
|
|
|
let connections_count = self.connections_count.clone();
|
|
tokio::spawn(async move {
|
|
// before executing the manager, wait for what was specified, if anything
|
|
if let Some(backoff) = backoff {
|
|
trace!("waiting for {:?} before attempting connection", backoff);
|
|
sleep(backoff).await;
|
|
}
|
|
|
|
connections_count.fetch_add(1, Ordering::SeqCst);
|
|
ManagedConnection::new(
|
|
address,
|
|
receiver,
|
|
initial_connection_timeout,
|
|
current_reconnection_attempt,
|
|
)
|
|
.run()
|
|
.await;
|
|
connections_count.fetch_sub(1, Ordering::SeqCst);
|
|
});
|
|
}
|
|
}
|
|
|
|
impl SendWithoutResponse for Client {
|
|
fn send_without_response(&self, packet: MixPacket) -> io::Result<()> {
|
|
let address = packet.next_hop_address();
|
|
trace!("Sending packet to {address}");
|
|
let framed_packet = FramedNymPacket::from(packet);
|
|
|
|
let Some(sender) = self.active_connections.get_mut(&address) else {
|
|
// there was never a connection to begin with
|
|
debug!("establishing initial connection to {address}");
|
|
// it's not a 'big' error, but we did not manage to send the packet, but queue the packet
|
|
// for sending for as soon as the connection is created
|
|
self.make_connection(address, framed_packet);
|
|
return Err(io::Error::new(
|
|
io::ErrorKind::NotConnected,
|
|
"connection is in progress",
|
|
));
|
|
};
|
|
|
|
let sending_res = sender.channel.try_send(framed_packet);
|
|
drop(sender);
|
|
|
|
sending_res.map_err(|err| {
|
|
match err {
|
|
TrySendError::Full(_) => {
|
|
debug!("Connection to {address} seems to not be able to handle all the traffic - dropping the current packet");
|
|
// it's not a 'big' error, but we did not manage to send the packet
|
|
// if the queue is full, we can't really do anything but to drop the packet
|
|
io::Error::new(
|
|
io::ErrorKind::WouldBlock,
|
|
"connection queue is full",
|
|
)
|
|
}
|
|
TrySendError::Closed(dropped) => {
|
|
debug!(
|
|
"Connection to {address} seems to be dead. attempting to re-establish it...",
|
|
);
|
|
|
|
// it's not a 'big' error, but we did not manage to send the packet, but queue
|
|
// it up to send it as soon as the connection is re-established
|
|
self.make_connection(address, dropped);
|
|
io::Error::new(
|
|
io::ErrorKind::ConnectionAborted,
|
|
"reconnection attempt is in progress",
|
|
)
|
|
}
|
|
}
|
|
} )
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
fn dummy_client() -> Client {
|
|
Client::new(
|
|
Config {
|
|
initial_reconnection_backoff: Duration::from_millis(10_000),
|
|
maximum_reconnection_backoff: Duration::from_millis(300_000),
|
|
initial_connection_timeout: Duration::from_millis(1_500),
|
|
maximum_connection_buffer_size: 128,
|
|
},
|
|
Default::default(),
|
|
)
|
|
}
|
|
|
|
#[test]
|
|
fn determining_backoff_works_regardless_of_attempt() {
|
|
let client = dummy_client();
|
|
assert!(client.determine_backoff(0).is_none());
|
|
assert!(client.determine_backoff(1).is_some());
|
|
assert!(client.determine_backoff(2).is_some());
|
|
assert_eq!(
|
|
client.determine_backoff(16).unwrap(),
|
|
client.config.maximum_reconnection_backoff
|
|
);
|
|
assert_eq!(
|
|
client.determine_backoff(32).unwrap(),
|
|
client.config.maximum_reconnection_backoff
|
|
);
|
|
assert_eq!(
|
|
client.determine_backoff(1024).unwrap(),
|
|
client.config.maximum_reconnection_backoff
|
|
);
|
|
assert_eq!(
|
|
client.determine_backoff(65536).unwrap(),
|
|
client.config.maximum_reconnection_backoff
|
|
);
|
|
assert_eq!(
|
|
client.determine_backoff(u32::MAX).unwrap(),
|
|
client.config.maximum_reconnection_backoff
|
|
);
|
|
}
|
|
}
|