Compare commits

...

17 Commits

Author SHA1 Message Date
Jon Häggblad 21471a78c3 client-core: downgrade a warning log msg to debug 2022-10-27 12:44:33 +02:00
Jon Häggblad 9a7a7f5615 client: move creating mix msg channel to mix traffic controller 2022-10-27 12:44:33 +02:00
Jon Häggblad 8e69a43f27 client-core: document constants 2022-10-27 12:44:33 +02:00
Jon Häggblad 3b4664fee9 out queue controller: simplify with just send 2022-10-27 12:44:33 +02:00
Jon Häggblad 5feb354568 changelog: add note 2022-10-27 12:44:32 +02:00
Jon Häggblad 777053bda1 Remove WIP comments 2022-10-27 12:44:09 +02:00
Jon Häggblad ed47942a53 Remove outdated comment 2022-10-27 12:44:09 +02:00
Jon Häggblad ec5f789c16 Revert "client-core: downgrade two debug statements to trace"
This reverts commit e0a7772fafac7bff0e4a2c50ba25e94b52b794e6.
2022-10-27 12:44:09 +02:00
Jon Häggblad 37381f9f1a client-core: rework delay controller 2022-10-27 12:44:09 +02:00
Jon Häggblad 3c9774314f wasm-client: add tokio dependency 2022-10-27 12:44:09 +02:00
Jon Häggblad 100634099e sending delay controller: tweak parameters 2022-10-27 12:44:09 +02:00
Jon Häggblad 51a2c9c180 client-core: downgrade two debug statements to trace 2022-10-27 12:44:09 +02:00
Jon Häggblad e0f5455739 client: introduce SendingDelayController 2022-10-27 12:44:09 +02:00
Jon Häggblad 48de8c5cf7 wasm-client: update channel 2022-10-27 12:44:09 +02:00
Jon Häggblad c23ec0e8f8 rustfmt 2022-10-27 12:44:09 +02:00
Jon Häggblad 1f4f153963 clients: dynamically adjust sending delay in steps 2022-10-27 12:44:09 +02:00
Jon Häggblad 67af175e18 clients: change mix traffic channel to bounded 2022-10-27 12:44:09 +02:00
7 changed files with 226 additions and 77 deletions
+2
View File
@@ -27,6 +27,7 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
- socks5 client: graceful shutdown should fix error on disconnect in nym-connect ([#1591])
- wasm-client: fixed build errors on MacOS and changed example JS code to use mainnet ([#1585])
- gateway-client: will attempt to read now as many as 8 websocket messages at once, assuming they're already available on the socket ([#1669])
- clients: bound the sphinx packet channel and reduce sending rate if gateway can't keep up ([#1703])
[#1541]: https://github.com/nymtech/nym/pull/1541
[#1558]: https://github.com/nymtech/nym/pull/1558
@@ -41,6 +42,7 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
[#1671]: https://github.com/nymtech/nym/pull/1671
[#1673]: https://github.com/nymtech/nym/pull/1673
[#1702]: https://github.com/nymtech/nym/pull/1702
[#1703]: https://github.com/nymtech/nym/pull/1703
## [nym-binaries-1.0.2](https://github.com/nymtech/nym/tree/nym-binaries-1.0.2)
@@ -16,6 +16,7 @@ use rand::{rngs::OsRng, CryptoRng, Rng};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::error::TrySendError;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time;
@@ -171,11 +172,18 @@ impl LoopCoverTrafficStream<OsRng> {
)
.expect("Somehow failed to generate a loop cover message with a valid topology");
// if this one fails, there's no retrying because it means that either:
// - we run out of memory
// - the receiver channel is closed
// in either case there's no recovery and we can only panic
self.mix_tx.unbounded_send(vec![cover_message]).unwrap();
if let Err(err) = self.mix_tx.try_send(vec![cover_message]) {
match err {
TrySendError::Full(_) => {
// This isn't a problem, if the channel is full means we're already sending the
// max amount of messages downstream can handle.
log::debug!("Failed to send cover message - channel full");
}
TrySendError::Closed(_) => {
log::warn!("Failed to send cover message - channel closed");
}
}
}
// TODO: I'm not entirely sure whether this is really required, because I'm not 100%
// sure how `yield_now()` works - whether it just notifies the scheduler or whether it
+17 -15
View File
@@ -2,15 +2,15 @@
// SPDX-License-Identifier: Apache-2.0
use crate::spawn_future;
use futures::channel::mpsc;
use futures::StreamExt;
use gateway_client::GatewayClient;
use log::*;
use nymsphinx::forwarding::packet::MixPacket;
pub type BatchMixMessageSender = mpsc::UnboundedSender<Vec<MixPacket>>;
pub type BatchMixMessageReceiver = mpsc::UnboundedReceiver<Vec<MixPacket>>;
pub type BatchMixMessageSender = tokio::sync::mpsc::Sender<Vec<MixPacket>>;
pub type BatchMixMessageReceiver = tokio::sync::mpsc::Receiver<Vec<MixPacket>>;
// We remind ourselves that 32 x 32kb = 1024kb, a reasonable size for a network buffer.
pub const MIX_MESSAGE_RECEIVER_BUFFER_SIZE: usize = 32;
const MAX_FAILURE_COUNT: usize = 100;
pub struct MixTrafficController {
@@ -25,15 +25,17 @@ pub struct MixTrafficController {
}
impl MixTrafficController {
pub fn new(
mix_rx: BatchMixMessageReceiver,
gateway_client: GatewayClient,
) -> MixTrafficController {
MixTrafficController {
gateway_client,
mix_rx,
consecutive_gateway_failure_count: 0,
}
pub fn new(gateway_client: GatewayClient) -> (MixTrafficController, BatchMixMessageSender) {
let (sphinx_message_sender, sphinx_message_receiver) =
tokio::sync::mpsc::channel(MIX_MESSAGE_RECEIVER_BUFFER_SIZE);
(
MixTrafficController {
gateway_client,
mix_rx: sphinx_message_receiver,
consecutive_gateway_failure_count: 0,
},
sphinx_message_sender,
)
}
async fn on_messages(&mut self, mut mix_packets: Vec<MixPacket>) {
@@ -72,7 +74,7 @@ impl MixTrafficController {
while !shutdown.is_shutdown() {
tokio::select! {
mix_packets = self.mix_rx.next() => match mix_packets {
mix_packets = self.mix_rx.recv() => match mix_packets {
Some(mix_packets) => {
self.on_messages(mix_packets).await;
},
@@ -96,7 +98,7 @@ impl MixTrafficController {
spawn_future(async move {
debug!("Started MixTrafficController without graceful shutdown support");
while let Some(mix_packets) = self.mix_rx.next().await {
while let Some(mix_packets) = self.mix_rx.recv().await {
self.on_messages(mix_packets).await;
}
})
@@ -27,6 +27,23 @@ use tokio::time;
#[cfg(target_arch = "wasm32")]
use wasm_timer;
// The minimum time between increasing the average delay between packets. If we hit the ceiling in
// the available buffer space we want to take somewhat swift action, but we still need to give a
// short time to give the channel a chance reduce pressure.
const INCREASE_DELAY_MIN_CHANGE_INTERVAL_SECS: u64 = 1;
// The minimum time between decreasing the average delay between packets. We don't want to change
// to quickly to keep things somewhat stable. Also there are buffers downstreams meaning we need to
// wait a little to see the effect before we decrease further.
const DECREASE_DELAY_MIN_CHANGE_INTERVAL_SECS: u64 = 30;
// If we enough time passes without any sign of backpressure in the channel, we can consider
// lowering the average delay. The goal is to keep somewhat stable, rather than maxing out
// bandwidth at all times.
const ACCEPTABLE_TIME_WITHOUT_BACKPRESSURE_SECS: u64 = 30;
// The maximum multiplier we apply to the base average Poisson delay.
const MAX_DELAY_MULTIPLIER: u32 = 6;
// The minium multiplier we apply to the base average Poisson delay.
const MIN_DELAY_MULTIPLIER: u32 = 1;
/// Configurable parameters of the `OutQueueControl`
pub(crate) struct Config {
/// Average delay an acknowledgement packet is going to get delay at a single mixnode.
@@ -68,6 +85,101 @@ impl Config {
}
}
struct SendingDelayController {
/// Multiply the average sending delay.
/// This is normally set to unity, but if we detect backpressure we increase this
/// multiplier. We use discrete steps.
current_multiplier: u32,
/// Maximum delay multiplier
upper_bound: u32,
/// Minimum delay multiplier
lower_bound: u32,
/// To make sure we don't change the multiplier to fast, we limit a change to some duration
#[cfg(not(target_arch = "wasm32"))]
time_when_changed: time::Instant,
#[cfg(target_arch = "wasm32")]
time_when_changed: wasm_timer::Instant,
/// If we have a long enough time without any backpressure detected we try reducing the sending
/// delay multiplier
#[cfg(not(target_arch = "wasm32"))]
time_when_backpressure_detected: time::Instant,
#[cfg(target_arch = "wasm32")]
time_when_backpressure_detected: wasm_timer::Instant,
}
#[cfg(not(target_arch = "wasm32"))]
fn get_time_now() -> time::Instant {
time::Instant::now()
}
#[cfg(target_arch = "wasm32")]
fn get_time_now() -> wasm_timer::Instant {
wasm_timer::Instant::now()
}
impl SendingDelayController {
fn new(lower_bound: u32, upper_bound: u32) -> Self {
assert!(lower_bound <= upper_bound);
let now = get_time_now();
SendingDelayController {
current_multiplier: MIN_DELAY_MULTIPLIER,
upper_bound,
lower_bound,
time_when_changed: now,
time_when_backpressure_detected: now,
}
}
fn current_multiplier(&self) -> u32 {
self.current_multiplier
}
fn increase_delay_multiplier(&mut self) {
self.current_multiplier =
(self.current_multiplier + 1).clamp(self.lower_bound, self.upper_bound);
self.time_when_changed = get_time_now();
log::debug!(
"Increasing sending delay multiplier to: {}",
self.current_multiplier
);
}
fn decrease_delay_multiplier(&mut self) {
self.current_multiplier =
(self.current_multiplier - 1).clamp(self.lower_bound, self.upper_bound);
self.time_when_changed = get_time_now();
log::debug!(
"Decreasing sending delay multiplier to: {}",
self.current_multiplier
);
}
fn record_backpressure_detected(&mut self) {
self.time_when_backpressure_detected = get_time_now();
}
fn not_increased_delay_recently(&self) -> bool {
get_time_now()
> self.time_when_changed + Duration::from_secs(INCREASE_DELAY_MIN_CHANGE_INTERVAL_SECS)
}
fn is_sending_reliable(&self) -> bool {
let now = get_time_now();
let delay_change_interval = Duration::from_secs(DECREASE_DELAY_MIN_CHANGE_INTERVAL_SECS);
let acceptable_time_without_backpressure =
Duration::from_secs(ACCEPTABLE_TIME_WITHOUT_BACKPRESSURE_SECS);
now > self.time_when_backpressure_detected + acceptable_time_without_backpressure
&& now > self.time_when_changed + delay_change_interval
}
}
pub(crate) struct OutQueueControl<R>
where
R: CryptoRng + Rng,
@@ -89,6 +201,10 @@ where
#[cfg(target_arch = "wasm32")]
next_delay: Option<Pin<Box<wasm_timer::Delay>>>,
// To make sure we don't overload the mix_tx channel, we limit the rate we are pushing
// messages.
sending_rate_controller: SendingDelayController,
/// Channel used for sending prepared sphinx packets to `MixTrafficController` that sends them
/// out to the network without any further delays.
mix_tx: BatchMixMessageSender,
@@ -156,6 +272,10 @@ where
ack_key,
sent_notifier,
next_delay: None,
sending_rate_controller: SendingDelayController::new(
MIN_DELAY_MULTIPLIER,
MAX_DELAY_MULTIPLIER,
),
mix_tx,
real_receiver,
our_full_destination,
@@ -212,15 +332,8 @@ where
}
};
// if this one fails, there's no retrying because it means that either:
// - we run out of memory
// - the receiver channel is closed
// in either case there's no recovery and we can only panic
if let Err(err) = self.mix_tx.unbounded_send(vec![next_message]) {
log::warn!(
"Failed to send {} packets (possible process shutdown?)",
err.into_inner().len()
);
if let Err(err) = self.mix_tx.send(vec![next_message]).await {
log::error!("Failed to send - channel closed: {}", err);
}
// JS: Not entirely sure why or how it fixes stuff, but without the yield call,
@@ -234,7 +347,44 @@ where
tokio::task::yield_now().await;
}
fn current_average_message_sending_delay(&self) -> Duration {
self.config.average_message_sending_delay
* self.sending_rate_controller.current_multiplier()
}
fn adjust_current_average_message_sending_delay(&mut self) {
let used_slots = self.mix_tx.max_capacity() - self.mix_tx.capacity();
log::trace!(
"used_slots: {used_slots}, current_multiplier: {}",
self.sending_rate_controller.current_multiplier()
);
// Even just a single used slot is enough to signal backpressure
if used_slots > 0 {
log::trace!("Backpressure detected");
self.sending_rate_controller.record_backpressure_detected();
}
// If the buffer is running out, slow down the sending rate
if self.mix_tx.capacity() == 0
&& self.sending_rate_controller.not_increased_delay_recently()
{
self.sending_rate_controller.increase_delay_multiplier();
}
// Very carefully step up the sending rate in case it seems like we can solidly handle the
// current rate.
if self.sending_rate_controller.is_sending_reliable() {
self.sending_rate_controller.decrease_delay_multiplier();
}
}
fn poll_poisson(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
// The average delay could change depending on if backpressure in the downstream channel
// (mix_tx) was detected.
self.adjust_current_average_message_sending_delay();
let avg_delay = self.current_average_message_sending_delay();
if let Some(ref mut next_delay) = &mut self.next_delay {
// it is not yet time to return a message
if next_delay.as_mut().poll(cx).is_pending() {
@@ -243,7 +393,6 @@ where
// we know it's time to send a message, so let's prepare delay for the next one
// Get the `now` by looking at the current `delay` deadline
let avg_delay = self.config.average_message_sending_delay;
let next_poisson_delay = sample_poisson_duration(&mut self.rng, avg_delay);
// The next interval value is `next_poisson_delay` after the one that just
+12 -17
View File
@@ -6,9 +6,7 @@ use client_core::client::inbound_messages::{
InputMessage, InputMessageReceiver, InputMessageSender,
};
use client_core::client::key_manager::KeyManager;
use client_core::client::mix_traffic::{
BatchMixMessageReceiver, BatchMixMessageSender, MixTrafficController,
};
use client_core::client::mix_traffic::{BatchMixMessageSender, MixTrafficController};
use client_core::client::real_messages_control;
use client_core::client::real_messages_control::RealMessagesController;
use client_core::client::received_buffer::{
@@ -263,13 +261,13 @@ impl NymClient {
// over it. Perhaps GatewayClient needs to be thread-shareable or have some channel for
// requests?
fn start_mix_traffic_controller(
&mut self,
mix_rx: BatchMixMessageReceiver,
gateway_client: GatewayClient,
shutdown: ShutdownListener,
) {
) -> BatchMixMessageSender {
info!("Starting mix traffic controller...");
MixTrafficController::new(mix_rx, gateway_client).start_with_shutdown(shutdown);
let (mix_traffic_controller, mix_tx) = MixTrafficController::new(gateway_client);
mix_traffic_controller.start_with_shutdown(shutdown);
mix_tx
}
fn start_websocket_listener(
@@ -356,11 +354,6 @@ impl NymClient {
// rather than creating them here, so say for example the buffer controller would create the request channels
// and would allow anyone to clone the sender channel
// sphinx_message_sender is the transmitter for any component generating sphinx packets that are to be sent to the mixnet
// they are used by cover traffic stream and real traffic stream
// sphinx_message_receiver is the receiver used by MixTrafficController that sends the actual traffic
let (sphinx_message_sender, sphinx_message_receiver) = mpsc::unbounded();
// unwrapped_sphinx_sender is the transmitter of mixnet messages received from the gateway
// unwrapped_sphinx_receiver is the receiver for said messages - used by ReceivedMessagesBuffer
let (mixnet_messages_sender, mixnet_messages_receiver) = mpsc::unbounded();
@@ -397,11 +390,13 @@ impl NymClient {
.start_gateway_client(mixnet_messages_sender, ack_sender, shutdown.subscribe())
.await;
self.start_mix_traffic_controller(
sphinx_message_receiver,
gateway_client,
shutdown.subscribe(),
);
// The sphinx_message_sender is the transmitter for any component generating sphinx packets
// that are to be sent to the mixnet. They are used by cover traffic stream and real
// traffic stream.
// The MixTrafficController then sends the actual traffic
let sphinx_message_sender =
Self::start_mix_traffic_controller(gateway_client, shutdown.subscribe());
self.start_real_traffic_controller(
shared_topology_accessor.clone(),
reply_key_storage,
+12 -17
View File
@@ -13,9 +13,7 @@ use client_core::client::inbound_messages::{
InputMessage, InputMessageReceiver, InputMessageSender,
};
use client_core::client::key_manager::KeyManager;
use client_core::client::mix_traffic::{
BatchMixMessageReceiver, BatchMixMessageSender, MixTrafficController,
};
use client_core::client::mix_traffic::{BatchMixMessageSender, MixTrafficController};
use client_core::client::real_messages_control::RealMessagesController;
use client_core::client::received_buffer::{
ReceivedBufferRequestReceiver, ReceivedBufferRequestSender, ReceivedMessagesBufferController,
@@ -263,13 +261,13 @@ impl NymClient {
// over it. Perhaps GatewayClient needs to be thread-shareable or have some channel for
// requests?
fn start_mix_traffic_controller(
&mut self,
mix_rx: BatchMixMessageReceiver,
gateway_client: GatewayClient,
shutdown: ShutdownListener,
) {
) -> BatchMixMessageSender {
info!("Starting mix traffic controller...");
MixTrafficController::new(mix_rx, gateway_client).start_with_shutdown(shutdown);
let (mix_traffic_controller, mix_tx) = MixTrafficController::new(gateway_client);
mix_traffic_controller.start_with_shutdown(shutdown);
mix_tx
}
fn start_socks5_listener(
@@ -345,11 +343,6 @@ impl NymClient {
// rather than creating them here, so say for example the buffer controller would create the request channels
// and would allow anyone to clone the sender channel
// sphinx_message_sender is the transmitter for any component generating sphinx packets that are to be sent to the mixnet
// they are used by cover traffic stream and real traffic stream
// sphinx_message_receiver is the receiver used by MixTrafficController that sends the actual traffic
let (sphinx_message_sender, sphinx_message_receiver) = mpsc::unbounded();
// unwrapped_sphinx_sender is the transmitter of mixnet messages received from the gateway
// unwrapped_sphinx_receiver is the receiver for said messages - used by ReceivedMessagesBuffer
let (mixnet_messages_sender, mixnet_messages_receiver) = mpsc::unbounded();
@@ -386,11 +379,13 @@ impl NymClient {
.start_gateway_client(mixnet_messages_sender, ack_sender, shutdown.subscribe())
.await;
self.start_mix_traffic_controller(
sphinx_message_receiver,
gateway_client,
shutdown.subscribe(),
);
// The sphinx_message_sender is the transmitter for any component generating sphinx packets
// that are to be sent to the mixnet. They are used by cover traffic stream and real
// traffic stream.
// The MixTrafficController then sends the actual traffic
let sphinx_message_sender =
Self::start_mix_traffic_controller(gateway_client, shutdown.subscribe());
self.start_real_traffic_controller(
shared_topology_accessor.clone(),
reply_key_storage,
+11 -13
View File
@@ -6,7 +6,7 @@ use client_core::client::{
cover_traffic_stream::LoopCoverTrafficStream,
inbound_messages::{InputMessage, InputMessageReceiver, InputMessageSender},
key_manager::KeyManager,
mix_traffic::{BatchMixMessageReceiver, BatchMixMessageSender, MixTrafficController},
mix_traffic::{BatchMixMessageSender, MixTrafficController},
real_messages_control::{self, RealMessagesController},
received_buffer::{
ReceivedBufferMessage, ReceivedBufferRequestReceiver, ReceivedBufferRequestSender,
@@ -253,13 +253,11 @@ impl NymClient {
// TODO: if we want to send control messages to gateway_client, this CAN'T take the ownership
// over it. Perhaps GatewayClient needs to be thread-shareable or have some channel for
// requests?
fn start_mix_traffic_controller(
&mut self,
mix_rx: BatchMixMessageReceiver,
gateway_client: GatewayClient,
) {
fn start_mix_traffic_controller(gateway_client: GatewayClient) -> BatchMixMessageSender {
console_log!("Starting mix traffic controller...");
MixTrafficController::new(mix_rx, gateway_client).start();
let (mix_traffic_controller, mix_tx) = MixTrafficController::new(gateway_client);
mix_traffic_controller.start();
mix_tx
}
// TODO: this procedure is extremely overcomplicated, because it's based off native client's behaviour
@@ -307,11 +305,6 @@ impl NymClient {
// rather than creating them here, so say for example the buffer controller would create the request channels
// and would allow anyone to clone the sender channel
// sphinx_message_sender is the transmitter for any component generating sphinx packets that are to be sent to the mixnet
// they are used by cover traffic stream and real traffic stream
// sphinx_message_receiver is the receiver used by MixTrafficController that sends the actual traffic
let (sphinx_message_sender, sphinx_message_receiver) = mpsc::unbounded();
// unwrapped_sphinx_sender is the transmitter of mixnet messages received from the gateway
// unwrapped_sphinx_receiver is the receiver for said messages - used by ReceivedMessagesBuffer
let (mixnet_messages_sender, mixnet_messages_receiver) = mpsc::unbounded();
@@ -339,7 +332,12 @@ impl NymClient {
.start_gateway_client(mixnet_messages_sender, ack_sender)
.await;
self.start_mix_traffic_controller(sphinx_message_receiver, gateway_client);
// The sphinx_message_sender is the transmitter for any component generating sphinx packets
// that are to be sent to the mixnet. They are used by cover traffic stream and real
// traffic stream.
// The MixTrafficController then sends the actual traffic
let sphinx_message_sender = Self::start_mix_traffic_controller(gateway_client);
self.start_real_traffic_controller(
shared_topology_accessor.clone(),
ack_receiver,