Compare commits

...

4 Commits

Author SHA1 Message Date
Jon Häggblad 230536d0d8 Somewhat working 2022-11-11 14:36:10 +01:00
Jon Häggblad 025fdc913d WIP 2022-11-11 10:23:44 +01:00
Jon Häggblad c175401562 WIP 2022-11-11 09:39:50 +01:00
Jon Häggblad 369c72ad76 WIP 2022-11-10 16:19:45 +01:00
3 changed files with 69 additions and 163 deletions
@@ -92,7 +92,8 @@ where
// while we were messing with topology, wrapping data in sphinx, etc. we actually received
// this ack after all! no need to retransmit then
debug!("We received an ack JUST as we were about to retransmit [2]");
return;
panic!("will we reach this?");
//return;
}
// we no longer need the reference - let's drop it so that if somehow `UpdateTimer` action
// reached the controller before this function terminated, the controller would not panic.
@@ -94,7 +94,7 @@ impl Config {
pub struct RealMessagesController<R>
where
R: CryptoRng + Rng,
R: CryptoRng + Rng + Clone,
{
out_queue_control: OutQueueControl<R>,
ack_control: AcknowledgementController<R>,
@@ -5,8 +5,7 @@ use crate::client::mix_traffic::BatchMixMessageSender;
use crate::client::real_messages_control::acknowledgement_control::SentPacketNotificationSender;
use crate::client::topology_control::TopologyAccessor;
use futures::channel::mpsc;
use futures::task::{Context, Poll};
use futures::{Future, Stream, StreamExt};
use futures::StreamExt;
use log::*;
use nymsphinx::acknowledgements::AckKey;
use nymsphinx::addressing::clients::Recipient;
@@ -17,7 +16,6 @@ use nymsphinx::params::PacketSize;
use nymsphinx::utils::sample_poisson_duration;
use rand::{CryptoRng, Rng};
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
@@ -44,6 +42,8 @@ const MAX_DELAY_MULTIPLIER: u32 = 6;
// The minium multiplier we apply to the base average Poisson delay.
const MIN_DELAY_MULTIPLIER: u32 = 1;
type SharedSendingDelayController = Arc<std::sync::Mutex<SendingDelayController>>;
/// Configurable parameters of the `OutQueueControl`
pub(crate) struct Config {
/// Average delay an acknowledgement packet is going to get delay at a single mixnode.
@@ -182,7 +182,7 @@ impl SendingDelayController {
pub(crate) struct OutQueueControl<R>
where
R: CryptoRng + Rng,
R: CryptoRng + Rng + Clone,
{
/// Configurable parameters of the `ActionController`
config: Config,
@@ -193,17 +193,9 @@ where
/// Channel used for notifying of a real packet being sent out. Used to start up retransmission timer.
sent_notifier: SentPacketNotificationSender,
/// Internal state, determined by `average_message_sending_delay`,
/// used to keep track of when a next packet should be sent out.
#[cfg(not(target_arch = "wasm32"))]
next_delay: Option<Pin<Box<time::Sleep>>>,
#[cfg(target_arch = "wasm32")]
next_delay: Option<Pin<Box<wasm_timer::Delay>>>,
// To make sure we don't overload the mix_tx channel, we limit the rate we are pushing
// messages.
sending_rate_controller: SendingDelayController,
sending_rate_controller: SharedSendingDelayController,
/// Channel used for sending prepared sphinx packets to `MixTrafficController` that sends them
/// out to the network without any further delays.
@@ -252,7 +244,7 @@ pub(crate) enum StreamMessage {
impl<R> OutQueueControl<R>
where
R: CryptoRng + Rng + Unpin,
R: CryptoRng + Rng + Unpin + Clone,
{
// at this point I'm not entirely sure how to deal with this warning without
// some considerable refactoring
@@ -271,11 +263,10 @@ where
config,
ack_key,
sent_notifier,
next_delay: None,
sending_rate_controller: SendingDelayController::new(
sending_rate_controller: Arc::new(std::sync::Mutex::new(SendingDelayController::new(
MIN_DELAY_MULTIPLIER,
MAX_DELAY_MULTIPLIER,
),
))),
mix_tx,
real_receiver,
our_full_destination,
@@ -357,148 +348,30 @@ where
tokio::task::yield_now().await;
}
fn current_average_message_sending_delay(&self) -> Duration {
self.config.average_message_sending_delay
* self.sending_rate_controller.current_multiplier()
}
fn adjust_current_average_message_sending_delay(&mut self) {
let mut ctrl = self.sending_rate_controller.lock().unwrap();
let used_slots = self.mix_tx.max_capacity() - self.mix_tx.capacity();
log::trace!(
"used_slots: {used_slots}, current_multiplier: {}",
self.sending_rate_controller.current_multiplier()
ctrl.current_multiplier()
);
// Even just a single used slot is enough to signal backpressure
if used_slots > 0 {
log::trace!("Backpressure detected");
self.sending_rate_controller.record_backpressure_detected();
ctrl.record_backpressure_detected();
}
// If the buffer is running out, slow down the sending rate
if self.mix_tx.capacity() == 0
&& self.sending_rate_controller.not_increased_delay_recently()
{
self.sending_rate_controller.increase_delay_multiplier();
if self.mix_tx.capacity() == 0 && ctrl.not_increased_delay_recently() {
ctrl.increase_delay_multiplier();
}
// Very carefully step up the sending rate in case it seems like we can solidly handle the
// current rate.
if self.sending_rate_controller.is_sending_reliable() {
self.sending_rate_controller.decrease_delay_multiplier();
}
}
fn poll_poisson(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
// The average delay could change depending on if backpressure in the downstream channel
// (mix_tx) was detected.
self.adjust_current_average_message_sending_delay();
let avg_delay = self.current_average_message_sending_delay();
if let Some(ref mut next_delay) = &mut self.next_delay {
// it is not yet time to return a message
if next_delay.as_mut().poll(cx).is_pending() {
return Poll::Pending;
};
// we know it's time to send a message, so let's prepare delay for the next one
// Get the `now` by looking at the current `delay` deadline
let next_poisson_delay = sample_poisson_duration(&mut self.rng, avg_delay);
// The next interval value is `next_poisson_delay` after the one that just
// yielded.
#[cfg(not(target_arch = "wasm32"))]
{
let now = next_delay.deadline();
let next = now + next_poisson_delay;
next_delay.as_mut().reset(next);
}
#[cfg(target_arch = "wasm32")]
{
next_delay.as_mut().reset(next_poisson_delay);
}
// check if we have anything immediately available
if let Some(real_available) = self.received_buffer.pop_front() {
return Poll::Ready(Some(StreamMessage::Real(Box::new(real_available))));
}
// decide what kind of message to send
match Pin::new(&mut self.real_receiver).poll_next(cx) {
// in the case our real message channel stream was closed, we should also indicate we are closed
// (and whoever is using the stream should panic)
Poll::Ready(None) => Poll::Ready(None),
// if there are more messages available, return first one and store the rest
Poll::Ready(Some(real_messages)) => {
self.received_buffer = real_messages.into();
// we MUST HAVE received at least ONE message
Poll::Ready(Some(StreamMessage::Real(Box::new(
self.received_buffer.pop_front().unwrap(),
))))
}
// otherwise construct a dummy one
Poll::Pending => Poll::Ready(Some(StreamMessage::Cover)),
}
} else {
// we never set an initial delay - let's do it now
cx.waker().wake_by_ref();
let sampled =
sample_poisson_duration(&mut self.rng, self.config.average_message_sending_delay);
#[cfg(not(target_arch = "wasm32"))]
let next_delay = Box::pin(time::sleep(sampled));
#[cfg(target_arch = "wasm32")]
let next_delay = Box::pin(wasm_timer::Delay::new(sampled));
self.next_delay = Some(next_delay);
Poll::Pending
}
}
fn poll_immediate(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
// check if we have anything immediately available
if let Some(real_available) = self.received_buffer.pop_front() {
// if there are more messages immediately available, notify the runtime
// because we should be polled again
if !self.received_buffer.is_empty() {
cx.waker().wake_by_ref()
}
return Poll::Ready(Some(StreamMessage::Real(Box::new(real_available))));
}
match Pin::new(&mut self.real_receiver).poll_next(cx) {
// in the case our real message channel stream was closed, we should also indicate we are closed
// (and whoever is using the stream should panic)
Poll::Ready(None) => Poll::Ready(None),
// if there are more messages available, return first one and store the rest
Poll::Ready(Some(real_messages)) => {
self.received_buffer = real_messages.into();
// we MUST HAVE received at least ONE message
Poll::Ready(Some(StreamMessage::Real(Box::new(
self.received_buffer.pop_front().unwrap(),
))))
}
// if there's nothing, then there's nothing
Poll::Pending => Poll::Pending,
}
}
fn poll_next_message(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<StreamMessage>> {
if self.config.disable_poisson_packet_distribution {
self.poll_immediate(cx)
} else {
self.poll_poisson(cx)
if ctrl.is_sending_reliable() {
ctrl.decrease_delay_multiplier();
}
}
@@ -506,44 +379,76 @@ where
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: task::ShutdownListener) {
debug!("Started OutQueueControl with graceful shutdown support");
let mut poisson_delay_timer = PoissonDelayTimer::new(
self.config.average_message_sending_delay,
self.sending_rate_controller.clone(),
self.rng.clone(),
);
let poisson_delay_stream = poisson_delay_timer.as_stream();
tokio::pin!(poisson_delay_stream);
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown.recv() => {
log::trace!("OutQueueControl: Received shutdown");
}
next_message = self.next() => match next_message {
Some(next_message) => {
self.on_message(next_message).await;
},
None => {
messages = self.real_receiver.next() => {
if let Some(real_messages) = messages {
self.received_buffer = real_messages.into();
} else {
log::trace!("OutQueueControl: Stopping since channel closed");
break;
}
}
_ = poisson_delay_stream.next() => {
self.adjust_current_average_message_sending_delay();
let msg = match self.received_buffer.pop_front() {
Some(msg) => StreamMessage::Real(Box::new(msg)),
None => StreamMessage::Cover
};
self.on_message(msg).await;
}
}
}
assert!(shutdown.is_shutdown_poll());
log::debug!("OutQueueControl: Exiting");
}
}
#[cfg(target_arch = "wasm32")]
pub(super) async fn run(&mut self) {
debug!("Started OutQueueControl without graceful shutdown support");
struct PoissonDelayTimer<R> {
average_message_sending_delay: Duration,
sending_rate_controller: SharedSendingDelayController,
rng: R,
}
while let Some(next_message) = self.next().await {
self.on_message(next_message).await;
impl<R> PoissonDelayTimer<R>
where
R: Rng + CryptoRng,
{
fn new(
average_message_sending_delay: Duration,
sending_rate_controller: SharedSendingDelayController,
rng: R,
) -> Self {
Self {
average_message_sending_delay,
sending_rate_controller,
rng,
}
}
}
impl<R> Stream for OutQueueControl<R>
where
R: CryptoRng + Rng + Unpin,
{
type Item = StreamMessage;
fn current_average_message_sending_delay(&self) -> Duration {
let ctrl = self.sending_rate_controller.lock().unwrap();
self.average_message_sending_delay * ctrl.current_multiplier()
}
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_next_message(cx)
fn as_stream(&mut self) -> impl futures::Stream<Item = ()> + '_ {
futures::stream::unfold(self, |out_queue_control| async {
let avg_delay = out_queue_control.current_average_message_sending_delay();
let next_poisson_delay = sample_poisson_duration(&mut out_queue_control.rng, avg_delay);
time::sleep(next_poisson_delay).await;
Some(((), out_queue_control))
})
}
}