Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e0b8638163 | |||
| fdb5727e5a | |||
| 3da131de15 |
+11
@@ -10,6 +10,7 @@ use nym_sphinx::{
|
||||
chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID},
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
/// Module responsible for listening for any data resembling acknowledgements from the network
|
||||
/// and firing actions to remove them from the 'Pending' state.
|
||||
@@ -48,10 +49,20 @@ impl AcknowledgementListener {
|
||||
// because nothing was inserted in the first place
|
||||
if frag_id == COVER_FRAG_ID {
|
||||
trace!("Received an ack for a cover message - no need to do anything");
|
||||
let time = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis();
|
||||
println!("cover ack : _{}", time);
|
||||
return;
|
||||
}
|
||||
|
||||
trace!("Received {} from the mix network", frag_id);
|
||||
let time = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis();
|
||||
println!("real ack : /{:?}/{}", frag_id, time);
|
||||
|
||||
self.action_sender
|
||||
.unbounded_send(Action::new_remove(frag_id))
|
||||
|
||||
+16
@@ -9,6 +9,7 @@ use nym_sphinx::addressing::clients::Recipient;
|
||||
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
|
||||
use nym_task::connections::TransmissionLane;
|
||||
use rand::{CryptoRng, Rng};
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
/// Module responsible for dealing with the received messages: splitting them, creating acknowledgements,
|
||||
/// putting everything into sphinx packets, etc.
|
||||
@@ -48,6 +49,11 @@ where
|
||||
lane: TransmissionLane,
|
||||
) {
|
||||
// offload reply handling to the dedicated task
|
||||
let time = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis();
|
||||
println!("sending reply: _{}_{}", time, data.len());
|
||||
self.reply_controller_sender
|
||||
.send_reply(recipient_tag, data, lane)
|
||||
}
|
||||
@@ -58,6 +64,11 @@ where
|
||||
content: Vec<u8>,
|
||||
lane: TransmissionLane,
|
||||
) {
|
||||
let time = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis();
|
||||
println!("sending plain: _{}_{}", time, content.len());
|
||||
if let Err(err) = self
|
||||
.message_handler
|
||||
.try_send_plain_message(recipient, content, lane)
|
||||
@@ -74,6 +85,11 @@ where
|
||||
reply_surbs: u32,
|
||||
lane: TransmissionLane,
|
||||
) {
|
||||
let time = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis();
|
||||
println!("sending anonymous: _{}_{}", time, content.len());
|
||||
if let Err(err) = self
|
||||
.message_handler
|
||||
.try_send_message_with_reply_surbs(recipient, content, reply_surbs, lane)
|
||||
|
||||
@@ -24,6 +24,7 @@ use rand::{CryptoRng, Rng};
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio::time;
|
||||
@@ -232,7 +233,11 @@ where
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let time = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis();
|
||||
println!("cover sent_{}_{:?}",time, self.config.cover_packet_size.size());
|
||||
(
|
||||
generate_loop_cover_packet(
|
||||
&mut self.rng,
|
||||
@@ -250,6 +255,11 @@ where
|
||||
)
|
||||
}
|
||||
StreamMessage::Real(real_message) => {
|
||||
let time = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis();
|
||||
println!("real sent: /{:?}/{}/{}", real_message.fragment_id, time, real_message.mix_packet.sphinx_packet().payload.len());
|
||||
(real_message.mix_packet, Some(real_message.fragment_id))
|
||||
}
|
||||
};
|
||||
@@ -297,22 +307,29 @@ where
|
||||
self.sending_delay_controller.current_multiplier()
|
||||
);
|
||||
|
||||
// Even just a single used slot is enough to signal backpressure
|
||||
if used_slots > 0 {
|
||||
if self
|
||||
.sending_delay_controller
|
||||
.is_backpressure_currently_detected(used_slots)
|
||||
{
|
||||
log::trace!("Backpressure detected");
|
||||
self.sending_delay_controller.record_backpressure_detected();
|
||||
}
|
||||
|
||||
// If the buffer is running out, slow down the sending rate
|
||||
// If the buffer is running out, slow down the sending rate by increasing the delay
|
||||
// multiplier.
|
||||
if self.mix_tx.capacity() == 0
|
||||
&& self.sending_delay_controller.not_increased_delay_recently()
|
||||
{
|
||||
self.sending_delay_controller.increase_delay_multiplier();
|
||||
}
|
||||
|
||||
// Very carefully step up the sending rate in case it seems like we can solidly handle the
|
||||
// current rate.
|
||||
if self.sending_delay_controller.is_sending_reliable() {
|
||||
// If it looks like we are sending reliably, increase the sending rate by decreasing the
|
||||
// sending delay multiplier.
|
||||
if !self
|
||||
.sending_delay_controller
|
||||
.was_backpressure_detected_recently()
|
||||
&& self.sending_delay_controller.not_decreased_delay_recently()
|
||||
{
|
||||
self.sending_delay_controller.decrease_delay_multiplier();
|
||||
}
|
||||
}
|
||||
|
||||
+23
-15
@@ -11,11 +11,14 @@ const INCREASE_DELAY_MIN_CHANGE_INTERVAL_SECS: u64 = 1;
|
||||
// The minimum time between decreasing the average delay between packets. We don't want to change
|
||||
// to quickly to keep things somewhat stable. Also there are buffers downstreams meaning we need to
|
||||
// wait a little to see the effect before we decrease further.
|
||||
const DECREASE_DELAY_MIN_CHANGE_INTERVAL_SECS: u64 = 30;
|
||||
const DECREASE_DELAY_MIN_CHANGE_INTERVAL_SECS: u64 = 3;
|
||||
// The queue length that is required for us to register that backpressure occured. If there are
|
||||
// more than this many packets waiting to be sent, we consider the channel to be under
|
||||
// backpressure.
|
||||
const BACKPRESSURE_THRESHOLD: usize = 10;
|
||||
// If we enough time passes without any sign of backpressure in the channel, we can consider
|
||||
// lowering the average delay. The goal is to keep somewhat stable, rather than maxing out
|
||||
// bandwidth at all times.
|
||||
const ACCEPTABLE_TIME_WITHOUT_BACKPRESSURE_SECS: u64 = 30;
|
||||
// lowering the average delay.
|
||||
const ACCEPTABLE_TIME_WITHOUT_BACKPRESSURE_SECS: u64 = 1;
|
||||
// The maximum multiplier we apply to the base average Poisson delay.
|
||||
const MAX_DELAY_MULTIPLIER: u32 = 6;
|
||||
// The minium multiplier we apply to the base average Poisson delay.
|
||||
@@ -100,22 +103,27 @@ impl SendingDelayController {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn record_backpressure_detected(&mut self) {
|
||||
self.time_when_backpressure_detected = get_time_now();
|
||||
}
|
||||
|
||||
pub(crate) fn not_increased_delay_recently(&self) -> bool {
|
||||
get_time_now()
|
||||
> self.time_when_changed + Duration::from_secs(INCREASE_DELAY_MIN_CHANGE_INTERVAL_SECS)
|
||||
}
|
||||
|
||||
pub(crate) fn is_sending_reliable(&self) -> bool {
|
||||
let now = get_time_now();
|
||||
let delay_change_interval = Duration::from_secs(DECREASE_DELAY_MIN_CHANGE_INTERVAL_SECS);
|
||||
let acceptable_time_without_backpressure =
|
||||
Duration::from_secs(ACCEPTABLE_TIME_WITHOUT_BACKPRESSURE_SECS);
|
||||
pub(crate) fn not_decreased_delay_recently(&self) -> bool {
|
||||
get_time_now()
|
||||
> self.time_when_changed + Duration::from_secs(DECREASE_DELAY_MIN_CHANGE_INTERVAL_SECS)
|
||||
}
|
||||
|
||||
now > self.time_when_backpressure_detected + acceptable_time_without_backpressure
|
||||
&& now > self.time_when_changed + delay_change_interval
|
||||
pub(crate) fn is_backpressure_currently_detected(&self, queue_length: usize) -> bool {
|
||||
queue_length > BACKPRESSURE_THRESHOLD
|
||||
}
|
||||
|
||||
pub(crate) fn record_backpressure_detected(&mut self) {
|
||||
self.time_when_backpressure_detected = get_time_now();
|
||||
}
|
||||
|
||||
pub(crate) fn was_backpressure_detected_recently(&self) -> bool {
|
||||
get_time_now()
|
||||
< self.time_when_backpressure_detected
|
||||
+ Duration::from_secs(ACCEPTABLE_TIME_WITHOUT_BACKPRESSURE_SECS)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ use nym_sphinx::params::ReplySurbKeyDigestAlgorithm;
|
||||
use nym_sphinx::receiver::{MessageReceiver, MessageRecoveryError, ReconstructedMessage};
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
// Buffer Requests to say "hey, send any reconstructed messages to this channel"
|
||||
// or to say "hey, I'm going offline, don't send anything more to me. Just buffer them instead"
|
||||
@@ -47,8 +48,14 @@ struct ReceivedMessagesBufferInner<R: MessageReceiver> {
|
||||
|
||||
impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
|
||||
fn recover_from_fragment(&mut self, fragment_data: &[u8]) -> Option<NymMessage> {
|
||||
let fragment_len = fragment_data.len();
|
||||
if nym_sphinx::cover::is_cover(fragment_data) {
|
||||
trace!("The message was a loop cover message! Skipping it");
|
||||
let time = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis();
|
||||
println!("Cover received_{}_{}",time, fragment_len);
|
||||
return None;
|
||||
}
|
||||
|
||||
@@ -60,6 +67,12 @@ impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
|
||||
Ok(frag) => frag,
|
||||
};
|
||||
|
||||
let time = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis();
|
||||
println!("real received : _{:?}_{}_{}_{}", &fragment.id(),&fragment.current_fragment(),time, fragment_len);
|
||||
|
||||
if self.recently_reconstructed.contains(&fragment.id()) {
|
||||
debug!("Received a chunk of already re-assembled message ({:?})! It probably got here because the ack got lost", fragment.id());
|
||||
return None;
|
||||
|
||||
Reference in New Issue
Block a user