Compare commits

...

1 Commits

Author SHA1 Message Date
Jon Häggblad ecb53cb0ed client-core: allow increasing sending rate with number of connections 2022-11-30 13:05:23 +01:00
7 changed files with 110 additions and 12 deletions
Generated
+24
View File
@@ -599,6 +599,7 @@ dependencies = [
"humantime-serde",
"log",
"nonexhaustive-delayqueue",
"num-rational",
"nymsphinx",
"pemstore",
"rand 0.7.3",
@@ -3056,6 +3057,17 @@ dependencies = [
"winapi",
]
[[package]]
name = "num-bigint"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f"
dependencies = [
"autocfg 1.1.0",
"num-integer",
"num-traits",
]
[[package]]
name = "num-derive"
version = "0.3.3"
@@ -3077,6 +3089,18 @@ dependencies = [
"num-traits",
]
[[package]]
name = "num-rational"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0638a1c9d0a3c0914158145bc76cff373a75a627e6ecbfb71cbe6f453a5a19b0"
dependencies = [
"autocfg 1.1.0",
"num-bigint",
"num-integer",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.14"
+1
View File
@@ -32,6 +32,7 @@ pemstore = { path = "../../common/pemstore" }
topology = { path = "../../common/topology" }
validator-client = { path = "../../common/client-libs/validator-client", default-features = false }
task = { path = "../../common/task" }
num-rational = "0.4.1"
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-stream]
version = "0.1.9"
@@ -185,6 +185,7 @@ impl<'a> BaseClientBuilder<'a> {
self.debug_config.ack_wait_addition,
self.debug_config.average_ack_delay,
self.debug_config.message_sending_average_delay,
self.debug_config.scale_sending_rate_with_no_connections,
self.debug_config.average_packet_delay,
self.debug_config.disable_main_poisson_packet_distribution,
self.as_mix_recipient(),
@@ -50,6 +50,8 @@ pub struct Config {
/// Average delay between sending subsequent packets from this client.
average_message_sending_delay: Duration,
scale_sending_rate_with_no_connections: bool,
/// Average delay a data packet is going to get delayed at a single mixnode.
average_packet_delay_duration: Duration,
@@ -73,6 +75,7 @@ impl Config {
ack_wait_addition: Duration,
average_ack_delay_duration: Duration,
average_message_sending_delay: Duration,
scale_sending_rate_with_no_connections: bool,
average_packet_delay_duration: Duration,
disable_main_poisson_packet_distribution: bool,
self_recipient: Recipient,
@@ -83,6 +86,7 @@ impl Config {
ack_wait_multiplier,
self_recipient,
average_message_sending_delay,
scale_sending_rate_with_no_connections,
average_packet_delay_duration,
average_ack_delay_duration,
disable_main_poisson_packet_distribution,
@@ -152,6 +156,7 @@ impl RealMessagesController<OsRng> {
config.average_ack_delay_duration,
config.average_packet_delay_duration,
config.average_message_sending_delay,
config.scale_sending_rate_with_no_connections,
config.disable_main_poisson_packet_distribution,
)
.with_custom_cover_packet_size(config.packet_size);
@@ -56,6 +56,8 @@ pub(crate) struct Config {
/// Average delay between sending subsequent packets.
average_message_sending_delay: Duration,
scale_sending_rate_with_no_connections: bool,
/// Controls whether the stream constantly produces packets according to the predefined
/// poisson distribution.
disable_poisson_packet_distribution: bool,
@@ -69,12 +71,14 @@ impl Config {
average_ack_delay: Duration,
average_packet_delay: Duration,
average_message_sending_delay: Duration,
scale_sending_rate_with_no_connections: bool,
disable_poisson_packet_distribution: bool,
) -> Self {
Config {
average_ack_delay,
average_packet_delay,
average_message_sending_delay,
scale_sending_rate_with_no_connections,
disable_poisson_packet_distribution,
cover_packet_size: Default::default(),
}
@@ -285,8 +289,14 @@ where
}
fn current_average_message_sending_delay(&self) -> Duration {
self.config.average_message_sending_delay
* self.sending_delay_controller.current_multiplier()
let current_numer = *self.sending_delay_controller.current_multiplier().numer() as f64;
let current_denom = *self.sending_delay_controller.current_multiplier().denom() as f64;
self.config
.average_message_sending_delay
.mul_f64(current_numer / current_denom)
//self.config.average_message_sending_delay
//* self.sending_delay_controller.current_multiplier()
}
fn adjust_current_average_message_sending_delay(&mut self) {
@@ -306,13 +316,24 @@ where
if self.mix_tx.capacity() == 0
&& self.sending_delay_controller.not_increased_delay_recently()
{
self.sending_delay_controller.increase_delay_multiplier();
if self.config.scale_sending_rate_with_no_connections {
self.sending_delay_controller
.increase_delay_multiplier_with_connections();
} else {
self.sending_delay_controller.increase_delay_multiplier();
}
}
// Very carefully step up the sending rate in case it seems like we can solidly handle the
// current rate.
if self.sending_delay_controller.is_sending_reliable() {
self.sending_delay_controller.decrease_delay_multiplier();
if self.config.scale_sending_rate_with_no_connections {
let no_connections = self.transmission_buffer.connections().len();
self.sending_delay_controller
.decrease_delay_multiplier_with_connections(no_connections);
} else {
self.sending_delay_controller.decrease_delay_multiplier();
}
}
}
@@ -487,7 +508,9 @@ where
#[cfg(not(target_arch = "wasm32"))]
fn log_status_infrequent(&self) {
if self.sending_delay_controller.current_multiplier() > 1 {
use num_rational::Rational64;
if self.sending_delay_controller.current_multiplier() > Rational64::from_integer(1) {
log::warn!(
"Unable to send packets fast enough - sending delay multiplier set to: {}",
self.sending_delay_controller.current_multiplier()
@@ -4,6 +4,7 @@
use super::get_time_now;
use std::time::Duration;
use num_rational::Rational64;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time;
#[cfg(target_arch = "wasm32")]
@@ -30,13 +31,13 @@ pub(crate) struct SendingDelayController {
/// Multiply the average sending delay.
/// This is normally set to unity, but if we detect backpressure we increase this
/// multiplier. We use discrete steps.
current_multiplier: u32,
current_multiplier: Rational64,
/// Maximum delay multiplier
upper_bound: u32,
upper_bound: Rational64,
/// Minimum delay multiplier
lower_bound: u32,
lower_bound: Rational64,
/// To make sure we don't change the multiplier to fast, we limit a change to some duration
#[cfg(not(target_arch = "wasm32"))]
@@ -65,15 +66,15 @@ impl SendingDelayController {
assert!(lower_bound <= upper_bound);
let now = get_time_now();
SendingDelayController {
current_multiplier: MIN_DELAY_MULTIPLIER,
upper_bound,
lower_bound,
current_multiplier: Rational64::from_integer(MIN_DELAY_MULTIPLIER.into()),
upper_bound: Rational64::from_integer(upper_bound.into()),
lower_bound: Rational64::from_integer(lower_bound.into()),
time_when_changed: now,
time_when_backpressure_detected: now,
}
}
pub(crate) fn current_multiplier(&self) -> u32 {
pub(crate) fn current_multiplier(&self) -> Rational64 {
self.current_multiplier
}
@@ -91,6 +92,24 @@ impl SendingDelayController {
}
}
pub(crate) fn increase_delay_multiplier_with_connections(&mut self) {
if self.current_multiplier < self.upper_bound {
if self.current_multiplier < Rational64::from_integer(1) {
self.current_multiplier *= 2;
} else {
self.current_multiplier =
(self.current_multiplier + 1).clamp(self.lower_bound, self.upper_bound);
}
self.time_when_changed = get_time_now();
log::warn!(
"Increasing sending delay multiplier to: {}",
self.current_multiplier
);
} else {
log::warn!("Trying to increase delay multipler higher than allowed");
}
}
pub(crate) fn decrease_delay_multiplier(&mut self) {
if self.current_multiplier > self.lower_bound {
self.current_multiplier =
@@ -103,6 +122,27 @@ impl SendingDelayController {
}
}
pub(crate) fn decrease_delay_multiplier_with_connections(
&mut self,
number_of_connections: usize,
) {
let lower_bound = Rational64::new(1, number_of_connections.try_into().unwrap());
log::info!("lower_bound: {}", lower_bound);
if self.current_multiplier > lower_bound {
if self.current_multiplier > Rational64::from_integer(1) {
self.current_multiplier =
(self.current_multiplier - 1).clamp(self.lower_bound, self.upper_bound);
} else {
self.current_multiplier /= 2;
}
}
log::debug!(
"Decreasing sending delay multiplier to: {}",
self.current_multiplier
);
}
pub(crate) fn record_backpressure_detected(&mut self) {
self.time_when_backpressure_detected = get_time_now();
}
+4
View File
@@ -474,6 +474,9 @@ pub struct DebugConfig {
#[serde(with = "humantime_serde")]
pub message_sending_average_delay: Duration,
/// Controls if we should increase the sending rate with the number of connections
pub scale_sending_rate_with_no_connections: bool,
/// How long we're willing to wait for a response to a message sent to the gateway,
/// before giving up on it.
#[serde(with = "humantime_serde")]
@@ -519,6 +522,7 @@ impl Default for DebugConfig {
ack_wait_addition: DEFAULT_ACK_WAIT_ADDITION,
loop_cover_traffic_average_delay: DEFAULT_LOOP_COVER_STREAM_AVERAGE_DELAY,
message_sending_average_delay: DEFAULT_MESSAGE_STREAM_AVERAGE_DELAY,
scale_sending_rate_with_no_connections: false,
gateway_response_timeout: DEFAULT_GATEWAY_RESPONSE_TIMEOUT,
topology_refresh_rate: DEFAULT_TOPOLOGY_REFRESH_RATE,
topology_resolution_timeout: DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT,