Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ecb53cb0ed |
Generated
+24
@@ -599,6 +599,7 @@ dependencies = [
|
||||
"humantime-serde",
|
||||
"log",
|
||||
"nonexhaustive-delayqueue",
|
||||
"num-rational",
|
||||
"nymsphinx",
|
||||
"pemstore",
|
||||
"rand 0.7.3",
|
||||
@@ -3056,6 +3057,17 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-bigint"
|
||||
version = "0.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f"
|
||||
dependencies = [
|
||||
"autocfg 1.1.0",
|
||||
"num-integer",
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-derive"
|
||||
version = "0.3.3"
|
||||
@@ -3077,6 +3089,18 @@ dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-rational"
|
||||
version = "0.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0638a1c9d0a3c0914158145bc76cff373a75a627e6ecbfb71cbe6f453a5a19b0"
|
||||
dependencies = [
|
||||
"autocfg 1.1.0",
|
||||
"num-bigint",
|
||||
"num-integer",
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-traits"
|
||||
version = "0.2.14"
|
||||
|
||||
@@ -32,6 +32,7 @@ pemstore = { path = "../../common/pemstore" }
|
||||
topology = { path = "../../common/topology" }
|
||||
validator-client = { path = "../../common/client-libs/validator-client", default-features = false }
|
||||
task = { path = "../../common/task" }
|
||||
num-rational = "0.4.1"
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-stream]
|
||||
version = "0.1.9"
|
||||
|
||||
@@ -185,6 +185,7 @@ impl<'a> BaseClientBuilder<'a> {
|
||||
self.debug_config.ack_wait_addition,
|
||||
self.debug_config.average_ack_delay,
|
||||
self.debug_config.message_sending_average_delay,
|
||||
self.debug_config.scale_sending_rate_with_no_connections,
|
||||
self.debug_config.average_packet_delay,
|
||||
self.debug_config.disable_main_poisson_packet_distribution,
|
||||
self.as_mix_recipient(),
|
||||
|
||||
@@ -50,6 +50,8 @@ pub struct Config {
|
||||
/// Average delay between sending subsequent packets from this client.
|
||||
average_message_sending_delay: Duration,
|
||||
|
||||
scale_sending_rate_with_no_connections: bool,
|
||||
|
||||
/// Average delay a data packet is going to get delayed at a single mixnode.
|
||||
average_packet_delay_duration: Duration,
|
||||
|
||||
@@ -73,6 +75,7 @@ impl Config {
|
||||
ack_wait_addition: Duration,
|
||||
average_ack_delay_duration: Duration,
|
||||
average_message_sending_delay: Duration,
|
||||
scale_sending_rate_with_no_connections: bool,
|
||||
average_packet_delay_duration: Duration,
|
||||
disable_main_poisson_packet_distribution: bool,
|
||||
self_recipient: Recipient,
|
||||
@@ -83,6 +86,7 @@ impl Config {
|
||||
ack_wait_multiplier,
|
||||
self_recipient,
|
||||
average_message_sending_delay,
|
||||
scale_sending_rate_with_no_connections,
|
||||
average_packet_delay_duration,
|
||||
average_ack_delay_duration,
|
||||
disable_main_poisson_packet_distribution,
|
||||
@@ -152,6 +156,7 @@ impl RealMessagesController<OsRng> {
|
||||
config.average_ack_delay_duration,
|
||||
config.average_packet_delay_duration,
|
||||
config.average_message_sending_delay,
|
||||
config.scale_sending_rate_with_no_connections,
|
||||
config.disable_main_poisson_packet_distribution,
|
||||
)
|
||||
.with_custom_cover_packet_size(config.packet_size);
|
||||
|
||||
@@ -56,6 +56,8 @@ pub(crate) struct Config {
|
||||
/// Average delay between sending subsequent packets.
|
||||
average_message_sending_delay: Duration,
|
||||
|
||||
scale_sending_rate_with_no_connections: bool,
|
||||
|
||||
/// Controls whether the stream constantly produces packets according to the predefined
|
||||
/// poisson distribution.
|
||||
disable_poisson_packet_distribution: bool,
|
||||
@@ -69,12 +71,14 @@ impl Config {
|
||||
average_ack_delay: Duration,
|
||||
average_packet_delay: Duration,
|
||||
average_message_sending_delay: Duration,
|
||||
scale_sending_rate_with_no_connections: bool,
|
||||
disable_poisson_packet_distribution: bool,
|
||||
) -> Self {
|
||||
Config {
|
||||
average_ack_delay,
|
||||
average_packet_delay,
|
||||
average_message_sending_delay,
|
||||
scale_sending_rate_with_no_connections,
|
||||
disable_poisson_packet_distribution,
|
||||
cover_packet_size: Default::default(),
|
||||
}
|
||||
@@ -285,8 +289,14 @@ where
|
||||
}
|
||||
|
||||
fn current_average_message_sending_delay(&self) -> Duration {
|
||||
self.config.average_message_sending_delay
|
||||
* self.sending_delay_controller.current_multiplier()
|
||||
let current_numer = *self.sending_delay_controller.current_multiplier().numer() as f64;
|
||||
let current_denom = *self.sending_delay_controller.current_multiplier().denom() as f64;
|
||||
self.config
|
||||
.average_message_sending_delay
|
||||
.mul_f64(current_numer / current_denom)
|
||||
|
||||
//self.config.average_message_sending_delay
|
||||
//* self.sending_delay_controller.current_multiplier()
|
||||
}
|
||||
|
||||
fn adjust_current_average_message_sending_delay(&mut self) {
|
||||
@@ -306,13 +316,24 @@ where
|
||||
if self.mix_tx.capacity() == 0
|
||||
&& self.sending_delay_controller.not_increased_delay_recently()
|
||||
{
|
||||
self.sending_delay_controller.increase_delay_multiplier();
|
||||
if self.config.scale_sending_rate_with_no_connections {
|
||||
self.sending_delay_controller
|
||||
.increase_delay_multiplier_with_connections();
|
||||
} else {
|
||||
self.sending_delay_controller.increase_delay_multiplier();
|
||||
}
|
||||
}
|
||||
|
||||
// Very carefully step up the sending rate in case it seems like we can solidly handle the
|
||||
// current rate.
|
||||
if self.sending_delay_controller.is_sending_reliable() {
|
||||
self.sending_delay_controller.decrease_delay_multiplier();
|
||||
if self.config.scale_sending_rate_with_no_connections {
|
||||
let no_connections = self.transmission_buffer.connections().len();
|
||||
self.sending_delay_controller
|
||||
.decrease_delay_multiplier_with_connections(no_connections);
|
||||
} else {
|
||||
self.sending_delay_controller.decrease_delay_multiplier();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -487,7 +508,9 @@ where
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
fn log_status_infrequent(&self) {
|
||||
if self.sending_delay_controller.current_multiplier() > 1 {
|
||||
use num_rational::Rational64;
|
||||
|
||||
if self.sending_delay_controller.current_multiplier() > Rational64::from_integer(1) {
|
||||
log::warn!(
|
||||
"Unable to send packets fast enough - sending delay multiplier set to: {}",
|
||||
self.sending_delay_controller.current_multiplier()
|
||||
|
||||
+47
-7
@@ -4,6 +4,7 @@
|
||||
use super::get_time_now;
|
||||
use std::time::Duration;
|
||||
|
||||
use num_rational::Rational64;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio::time;
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
@@ -30,13 +31,13 @@ pub(crate) struct SendingDelayController {
|
||||
/// Multiply the average sending delay.
|
||||
/// This is normally set to unity, but if we detect backpressure we increase this
|
||||
/// multiplier. We use discrete steps.
|
||||
current_multiplier: u32,
|
||||
current_multiplier: Rational64,
|
||||
|
||||
/// Maximum delay multiplier
|
||||
upper_bound: u32,
|
||||
upper_bound: Rational64,
|
||||
|
||||
/// Minimum delay multiplier
|
||||
lower_bound: u32,
|
||||
lower_bound: Rational64,
|
||||
|
||||
/// To make sure we don't change the multiplier to fast, we limit a change to some duration
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
@@ -65,15 +66,15 @@ impl SendingDelayController {
|
||||
assert!(lower_bound <= upper_bound);
|
||||
let now = get_time_now();
|
||||
SendingDelayController {
|
||||
current_multiplier: MIN_DELAY_MULTIPLIER,
|
||||
upper_bound,
|
||||
lower_bound,
|
||||
current_multiplier: Rational64::from_integer(MIN_DELAY_MULTIPLIER.into()),
|
||||
upper_bound: Rational64::from_integer(upper_bound.into()),
|
||||
lower_bound: Rational64::from_integer(lower_bound.into()),
|
||||
time_when_changed: now,
|
||||
time_when_backpressure_detected: now,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn current_multiplier(&self) -> u32 {
|
||||
pub(crate) fn current_multiplier(&self) -> Rational64 {
|
||||
self.current_multiplier
|
||||
}
|
||||
|
||||
@@ -91,6 +92,24 @@ impl SendingDelayController {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn increase_delay_multiplier_with_connections(&mut self) {
|
||||
if self.current_multiplier < self.upper_bound {
|
||||
if self.current_multiplier < Rational64::from_integer(1) {
|
||||
self.current_multiplier *= 2;
|
||||
} else {
|
||||
self.current_multiplier =
|
||||
(self.current_multiplier + 1).clamp(self.lower_bound, self.upper_bound);
|
||||
}
|
||||
self.time_when_changed = get_time_now();
|
||||
log::warn!(
|
||||
"Increasing sending delay multiplier to: {}",
|
||||
self.current_multiplier
|
||||
);
|
||||
} else {
|
||||
log::warn!("Trying to increase delay multipler higher than allowed");
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn decrease_delay_multiplier(&mut self) {
|
||||
if self.current_multiplier > self.lower_bound {
|
||||
self.current_multiplier =
|
||||
@@ -103,6 +122,27 @@ impl SendingDelayController {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn decrease_delay_multiplier_with_connections(
|
||||
&mut self,
|
||||
number_of_connections: usize,
|
||||
) {
|
||||
let lower_bound = Rational64::new(1, number_of_connections.try_into().unwrap());
|
||||
log::info!("lower_bound: {}", lower_bound);
|
||||
|
||||
if self.current_multiplier > lower_bound {
|
||||
if self.current_multiplier > Rational64::from_integer(1) {
|
||||
self.current_multiplier =
|
||||
(self.current_multiplier - 1).clamp(self.lower_bound, self.upper_bound);
|
||||
} else {
|
||||
self.current_multiplier /= 2;
|
||||
}
|
||||
}
|
||||
log::debug!(
|
||||
"Decreasing sending delay multiplier to: {}",
|
||||
self.current_multiplier
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) fn record_backpressure_detected(&mut self) {
|
||||
self.time_when_backpressure_detected = get_time_now();
|
||||
}
|
||||
|
||||
@@ -474,6 +474,9 @@ pub struct DebugConfig {
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub message_sending_average_delay: Duration,
|
||||
|
||||
/// Controls if we should increase the sending rate with the number of connections
|
||||
pub scale_sending_rate_with_no_connections: bool,
|
||||
|
||||
/// How long we're willing to wait for a response to a message sent to the gateway,
|
||||
/// before giving up on it.
|
||||
#[serde(with = "humantime_serde")]
|
||||
@@ -519,6 +522,7 @@ impl Default for DebugConfig {
|
||||
ack_wait_addition: DEFAULT_ACK_WAIT_ADDITION,
|
||||
loop_cover_traffic_average_delay: DEFAULT_LOOP_COVER_STREAM_AVERAGE_DELAY,
|
||||
message_sending_average_delay: DEFAULT_MESSAGE_STREAM_AVERAGE_DELAY,
|
||||
scale_sending_rate_with_no_connections: false,
|
||||
gateway_response_timeout: DEFAULT_GATEWAY_RESPONSE_TIMEOUT,
|
||||
topology_refresh_rate: DEFAULT_TOPOLOGY_REFRESH_RATE,
|
||||
topology_resolution_timeout: DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT,
|
||||
|
||||
Reference in New Issue
Block a user