Compare commits

...

3 Commits

Author SHA1 Message Date
Simon Wicky e0b8638163 add cover timing and ack logging 2023-04-20 09:09:12 +00:00
Jon Häggblad fdb5727e5a Rework Poisson process throttling 2023-04-17 13:02:40 +00:00
Simon Wicky 3da131de15 add packets logging 2023-04-17 14:44:05 +02:00
5 changed files with 87 additions and 22 deletions
@@ -10,6 +10,7 @@ use nym_sphinx::{
chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID},
};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
/// Module responsible for listening for any data resembling acknowledgements from the network
/// and firing actions to remove them from the 'Pending' state.
@@ -48,10 +49,20 @@ impl AcknowledgementListener {
// because nothing was inserted in the first place
if frag_id == COVER_FRAG_ID {
trace!("Received an ack for a cover message - no need to do anything");
let time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
println!("cover ack : _{}", time);
return;
}
trace!("Received {} from the mix network", frag_id);
let time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
println!("real ack : /{:?}/{}", frag_id, time);
self.action_sender
.unbounded_send(Action::new_remove(frag_id))
@@ -9,6 +9,7 @@ use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_task::connections::TransmissionLane;
use rand::{CryptoRng, Rng};
use std::time::{SystemTime, UNIX_EPOCH};
/// Module responsible for dealing with the received messages: splitting them, creating acknowledgements,
/// putting everything into sphinx packets, etc.
@@ -48,6 +49,11 @@ where
lane: TransmissionLane,
) {
// offload reply handling to the dedicated task
let time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
println!("sending reply: _{}_{}", time, data.len());
self.reply_controller_sender
.send_reply(recipient_tag, data, lane)
}
@@ -58,6 +64,11 @@ where
content: Vec<u8>,
lane: TransmissionLane,
) {
let time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
println!("sending plain: _{}_{}", time, content.len());
if let Err(err) = self
.message_handler
.try_send_plain_message(recipient, content, lane)
@@ -74,6 +85,11 @@ where
reply_surbs: u32,
lane: TransmissionLane,
) {
let time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
println!("sending anonymous: _{}_{}", time, content.len());
if let Err(err) = self
.message_handler
.try_send_message_with_reply_surbs(recipient, content, reply_surbs, lane)
@@ -24,6 +24,7 @@ use rand::{CryptoRng, Rng};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
#[cfg(not(target_arch = "wasm32"))]
use tokio::time;
@@ -232,7 +233,11 @@ where
return;
}
};
let time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
println!("cover sent_{}_{:?}",time, self.config.cover_packet_size.size());
(
generate_loop_cover_packet(
&mut self.rng,
@@ -250,6 +255,11 @@ where
)
}
StreamMessage::Real(real_message) => {
let time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
println!("real sent: /{:?}/{}/{}", real_message.fragment_id, time, real_message.mix_packet.sphinx_packet().payload.len());
(real_message.mix_packet, Some(real_message.fragment_id))
}
};
@@ -297,22 +307,29 @@ where
self.sending_delay_controller.current_multiplier()
);
// Even just a single used slot is enough to signal backpressure
if used_slots > 0 {
if self
.sending_delay_controller
.is_backpressure_currently_detected(used_slots)
{
log::trace!("Backpressure detected");
self.sending_delay_controller.record_backpressure_detected();
}
// If the buffer is running out, slow down the sending rate
// If the buffer is running out, slow down the sending rate by increasing the delay
// multiplier.
if self.mix_tx.capacity() == 0
&& self.sending_delay_controller.not_increased_delay_recently()
{
self.sending_delay_controller.increase_delay_multiplier();
}
// Very carefully step up the sending rate in case it seems like we can solidly handle the
// current rate.
if self.sending_delay_controller.is_sending_reliable() {
// If it looks like we are sending reliably, increase the sending rate by decreasing the
// sending delay multiplier.
if !self
.sending_delay_controller
.was_backpressure_detected_recently()
&& self.sending_delay_controller.not_decreased_delay_recently()
{
self.sending_delay_controller.decrease_delay_multiplier();
}
}
@@ -11,11 +11,14 @@ const INCREASE_DELAY_MIN_CHANGE_INTERVAL_SECS: u64 = 1;
// The minimum time between decreasing the average delay between packets. We don't want to change
// to quickly to keep things somewhat stable. Also there are buffers downstreams meaning we need to
// wait a little to see the effect before we decrease further.
const DECREASE_DELAY_MIN_CHANGE_INTERVAL_SECS: u64 = 30;
const DECREASE_DELAY_MIN_CHANGE_INTERVAL_SECS: u64 = 3;
// The queue length that is required for us to register that backpressure occured. If there are
// more than this many packets waiting to be sent, we consider the channel to be under
// backpressure.
const BACKPRESSURE_THRESHOLD: usize = 10;
// If we enough time passes without any sign of backpressure in the channel, we can consider
// lowering the average delay. The goal is to keep somewhat stable, rather than maxing out
// bandwidth at all times.
const ACCEPTABLE_TIME_WITHOUT_BACKPRESSURE_SECS: u64 = 30;
// lowering the average delay.
const ACCEPTABLE_TIME_WITHOUT_BACKPRESSURE_SECS: u64 = 1;
// The maximum multiplier we apply to the base average Poisson delay.
const MAX_DELAY_MULTIPLIER: u32 = 6;
// The minium multiplier we apply to the base average Poisson delay.
@@ -100,22 +103,27 @@ impl SendingDelayController {
}
}
pub(crate) fn record_backpressure_detected(&mut self) {
self.time_when_backpressure_detected = get_time_now();
}
pub(crate) fn not_increased_delay_recently(&self) -> bool {
get_time_now()
> self.time_when_changed + Duration::from_secs(INCREASE_DELAY_MIN_CHANGE_INTERVAL_SECS)
}
pub(crate) fn is_sending_reliable(&self) -> bool {
let now = get_time_now();
let delay_change_interval = Duration::from_secs(DECREASE_DELAY_MIN_CHANGE_INTERVAL_SECS);
let acceptable_time_without_backpressure =
Duration::from_secs(ACCEPTABLE_TIME_WITHOUT_BACKPRESSURE_SECS);
pub(crate) fn not_decreased_delay_recently(&self) -> bool {
get_time_now()
> self.time_when_changed + Duration::from_secs(DECREASE_DELAY_MIN_CHANGE_INTERVAL_SECS)
}
now > self.time_when_backpressure_detected + acceptable_time_without_backpressure
&& now > self.time_when_changed + delay_change_interval
pub(crate) fn is_backpressure_currently_detected(&self, queue_length: usize) -> bool {
queue_length > BACKPRESSURE_THRESHOLD
}
pub(crate) fn record_backpressure_detected(&mut self) {
self.time_when_backpressure_detected = get_time_now();
}
pub(crate) fn was_backpressure_detected_recently(&self) -> bool {
get_time_now()
< self.time_when_backpressure_detected
+ Duration::from_secs(ACCEPTABLE_TIME_WITHOUT_BACKPRESSURE_SECS)
}
}
@@ -20,6 +20,7 @@ use nym_sphinx::params::ReplySurbKeyDigestAlgorithm;
use nym_sphinx::receiver::{MessageReceiver, MessageRecoveryError, ReconstructedMessage};
use std::collections::HashSet;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
// Buffer Requests to say "hey, send any reconstructed messages to this channel"
// or to say "hey, I'm going offline, don't send anything more to me. Just buffer them instead"
@@ -47,8 +48,14 @@ struct ReceivedMessagesBufferInner<R: MessageReceiver> {
impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
fn recover_from_fragment(&mut self, fragment_data: &[u8]) -> Option<NymMessage> {
let fragment_len = fragment_data.len();
if nym_sphinx::cover::is_cover(fragment_data) {
trace!("The message was a loop cover message! Skipping it");
let time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
println!("Cover received_{}_{}",time, fragment_len);
return None;
}
@@ -60,6 +67,12 @@ impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
Ok(frag) => frag,
};
let time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
println!("real received : _{:?}_{}_{}_{}", &fragment.id(),&fragment.current_fragment(),time, fragment_len);
if self.recently_reconstructed.contains(&fragment.id()) {
debug!("Received a chunk of already re-assembled message ({:?})! It probably got here because the ack got lost", fragment.id());
return None;