|
|
|
@@ -5,8 +5,7 @@ use crate::client::mix_traffic::BatchMixMessageSender;
|
|
|
|
|
use crate::client::real_messages_control::acknowledgement_control::SentPacketNotificationSender;
|
|
|
|
|
use crate::client::topology_control::TopologyAccessor;
|
|
|
|
|
use futures::channel::mpsc;
|
|
|
|
|
use futures::task::{Context, Poll};
|
|
|
|
|
use futures::{Future, Stream, StreamExt};
|
|
|
|
|
use futures::StreamExt;
|
|
|
|
|
use log::*;
|
|
|
|
|
use nymsphinx::acknowledgements::AckKey;
|
|
|
|
|
use nymsphinx::addressing::clients::Recipient;
|
|
|
|
@@ -17,7 +16,6 @@ use nymsphinx::params::PacketSize;
|
|
|
|
|
use nymsphinx::utils::sample_poisson_duration;
|
|
|
|
|
use rand::{CryptoRng, Rng};
|
|
|
|
|
use std::collections::VecDeque;
|
|
|
|
|
use std::pin::Pin;
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
use std::time::Duration;
|
|
|
|
|
|
|
|
|
@@ -44,6 +42,8 @@ const MAX_DELAY_MULTIPLIER: u32 = 6;
|
|
|
|
|
// The minium multiplier we apply to the base average Poisson delay.
|
|
|
|
|
const MIN_DELAY_MULTIPLIER: u32 = 1;
|
|
|
|
|
|
|
|
|
|
type SharedSendingDelayController = Arc<std::sync::Mutex<SendingDelayController>>;
|
|
|
|
|
|
|
|
|
|
/// Configurable parameters of the `OutQueueControl`
|
|
|
|
|
pub(crate) struct Config {
|
|
|
|
|
/// Average delay an acknowledgement packet is going to get delay at a single mixnode.
|
|
|
|
@@ -182,7 +182,7 @@ impl SendingDelayController {
|
|
|
|
|
|
|
|
|
|
pub(crate) struct OutQueueControl<R>
|
|
|
|
|
where
|
|
|
|
|
R: CryptoRng + Rng,
|
|
|
|
|
R: CryptoRng + Rng + Clone,
|
|
|
|
|
{
|
|
|
|
|
/// Configurable parameters of the `ActionController`
|
|
|
|
|
config: Config,
|
|
|
|
@@ -193,17 +193,9 @@ where
|
|
|
|
|
/// Channel used for notifying of a real packet being sent out. Used to start up retransmission timer.
|
|
|
|
|
sent_notifier: SentPacketNotificationSender,
|
|
|
|
|
|
|
|
|
|
/// Internal state, determined by `average_message_sending_delay`,
|
|
|
|
|
/// used to keep track of when a next packet should be sent out.
|
|
|
|
|
#[cfg(not(target_arch = "wasm32"))]
|
|
|
|
|
next_delay: Option<Pin<Box<time::Sleep>>>,
|
|
|
|
|
|
|
|
|
|
#[cfg(target_arch = "wasm32")]
|
|
|
|
|
next_delay: Option<Pin<Box<wasm_timer::Delay>>>,
|
|
|
|
|
|
|
|
|
|
// To make sure we don't overload the mix_tx channel, we limit the rate we are pushing
|
|
|
|
|
// messages.
|
|
|
|
|
sending_rate_controller: SendingDelayController,
|
|
|
|
|
sending_rate_controller: SharedSendingDelayController,
|
|
|
|
|
|
|
|
|
|
/// Channel used for sending prepared sphinx packets to `MixTrafficController` that sends them
|
|
|
|
|
/// out to the network without any further delays.
|
|
|
|
@@ -252,7 +244,7 @@ pub(crate) enum StreamMessage {
|
|
|
|
|
|
|
|
|
|
impl<R> OutQueueControl<R>
|
|
|
|
|
where
|
|
|
|
|
R: CryptoRng + Rng + Unpin,
|
|
|
|
|
R: CryptoRng + Rng + Unpin + Clone,
|
|
|
|
|
{
|
|
|
|
|
// at this point I'm not entirely sure how to deal with this warning without
|
|
|
|
|
// some considerable refactoring
|
|
|
|
@@ -271,11 +263,10 @@ where
|
|
|
|
|
config,
|
|
|
|
|
ack_key,
|
|
|
|
|
sent_notifier,
|
|
|
|
|
next_delay: None,
|
|
|
|
|
sending_rate_controller: SendingDelayController::new(
|
|
|
|
|
sending_rate_controller: Arc::new(std::sync::Mutex::new(SendingDelayController::new(
|
|
|
|
|
MIN_DELAY_MULTIPLIER,
|
|
|
|
|
MAX_DELAY_MULTIPLIER,
|
|
|
|
|
),
|
|
|
|
|
))),
|
|
|
|
|
mix_tx,
|
|
|
|
|
real_receiver,
|
|
|
|
|
our_full_destination,
|
|
|
|
@@ -357,148 +348,30 @@ where
|
|
|
|
|
tokio::task::yield_now().await;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn current_average_message_sending_delay(&self) -> Duration {
|
|
|
|
|
self.config.average_message_sending_delay
|
|
|
|
|
* self.sending_rate_controller.current_multiplier()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn adjust_current_average_message_sending_delay(&mut self) {
|
|
|
|
|
let mut ctrl = self.sending_rate_controller.lock().unwrap();
|
|
|
|
|
|
|
|
|
|
let used_slots = self.mix_tx.max_capacity() - self.mix_tx.capacity();
|
|
|
|
|
log::trace!(
|
|
|
|
|
"used_slots: {used_slots}, current_multiplier: {}",
|
|
|
|
|
self.sending_rate_controller.current_multiplier()
|
|
|
|
|
ctrl.current_multiplier()
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// Even just a single used slot is enough to signal backpressure
|
|
|
|
|
if used_slots > 0 {
|
|
|
|
|
log::trace!("Backpressure detected");
|
|
|
|
|
self.sending_rate_controller.record_backpressure_detected();
|
|
|
|
|
ctrl.record_backpressure_detected();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// If the buffer is running out, slow down the sending rate
|
|
|
|
|
if self.mix_tx.capacity() == 0
|
|
|
|
|
&& self.sending_rate_controller.not_increased_delay_recently()
|
|
|
|
|
{
|
|
|
|
|
self.sending_rate_controller.increase_delay_multiplier();
|
|
|
|
|
if self.mix_tx.capacity() == 0 && ctrl.not_increased_delay_recently() {
|
|
|
|
|
ctrl.increase_delay_multiplier();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Very carefully step up the sending rate in case it seems like we can solidly handle the
|
|
|
|
|
// current rate.
|
|
|
|
|
if self.sending_rate_controller.is_sending_reliable() {
|
|
|
|
|
self.sending_rate_controller.decrease_delay_multiplier();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn poll_poisson(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
|
|
|
|
|
// The average delay could change depending on if backpressure in the downstream channel
|
|
|
|
|
// (mix_tx) was detected.
|
|
|
|
|
self.adjust_current_average_message_sending_delay();
|
|
|
|
|
let avg_delay = self.current_average_message_sending_delay();
|
|
|
|
|
|
|
|
|
|
if let Some(ref mut next_delay) = &mut self.next_delay {
|
|
|
|
|
// it is not yet time to return a message
|
|
|
|
|
if next_delay.as_mut().poll(cx).is_pending() {
|
|
|
|
|
return Poll::Pending;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// we know it's time to send a message, so let's prepare delay for the next one
|
|
|
|
|
// Get the `now` by looking at the current `delay` deadline
|
|
|
|
|
let next_poisson_delay = sample_poisson_duration(&mut self.rng, avg_delay);
|
|
|
|
|
|
|
|
|
|
// The next interval value is `next_poisson_delay` after the one that just
|
|
|
|
|
// yielded.
|
|
|
|
|
#[cfg(not(target_arch = "wasm32"))]
|
|
|
|
|
{
|
|
|
|
|
let now = next_delay.deadline();
|
|
|
|
|
let next = now + next_poisson_delay;
|
|
|
|
|
next_delay.as_mut().reset(next);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[cfg(target_arch = "wasm32")]
|
|
|
|
|
{
|
|
|
|
|
next_delay.as_mut().reset(next_poisson_delay);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// check if we have anything immediately available
|
|
|
|
|
if let Some(real_available) = self.received_buffer.pop_front() {
|
|
|
|
|
return Poll::Ready(Some(StreamMessage::Real(Box::new(real_available))));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// decide what kind of message to send
|
|
|
|
|
match Pin::new(&mut self.real_receiver).poll_next(cx) {
|
|
|
|
|
// in the case our real message channel stream was closed, we should also indicate we are closed
|
|
|
|
|
// (and whoever is using the stream should panic)
|
|
|
|
|
Poll::Ready(None) => Poll::Ready(None),
|
|
|
|
|
|
|
|
|
|
// if there are more messages available, return first one and store the rest
|
|
|
|
|
Poll::Ready(Some(real_messages)) => {
|
|
|
|
|
self.received_buffer = real_messages.into();
|
|
|
|
|
// we MUST HAVE received at least ONE message
|
|
|
|
|
Poll::Ready(Some(StreamMessage::Real(Box::new(
|
|
|
|
|
self.received_buffer.pop_front().unwrap(),
|
|
|
|
|
))))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// otherwise construct a dummy one
|
|
|
|
|
Poll::Pending => Poll::Ready(Some(StreamMessage::Cover)),
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// we never set an initial delay - let's do it now
|
|
|
|
|
cx.waker().wake_by_ref();
|
|
|
|
|
|
|
|
|
|
let sampled =
|
|
|
|
|
sample_poisson_duration(&mut self.rng, self.config.average_message_sending_delay);
|
|
|
|
|
|
|
|
|
|
#[cfg(not(target_arch = "wasm32"))]
|
|
|
|
|
let next_delay = Box::pin(time::sleep(sampled));
|
|
|
|
|
|
|
|
|
|
#[cfg(target_arch = "wasm32")]
|
|
|
|
|
let next_delay = Box::pin(wasm_timer::Delay::new(sampled));
|
|
|
|
|
|
|
|
|
|
self.next_delay = Some(next_delay);
|
|
|
|
|
|
|
|
|
|
Poll::Pending
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn poll_immediate(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
|
|
|
|
|
// check if we have anything immediately available
|
|
|
|
|
if let Some(real_available) = self.received_buffer.pop_front() {
|
|
|
|
|
// if there are more messages immediately available, notify the runtime
|
|
|
|
|
// because we should be polled again
|
|
|
|
|
if !self.received_buffer.is_empty() {
|
|
|
|
|
cx.waker().wake_by_ref()
|
|
|
|
|
}
|
|
|
|
|
return Poll::Ready(Some(StreamMessage::Real(Box::new(real_available))));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
match Pin::new(&mut self.real_receiver).poll_next(cx) {
|
|
|
|
|
// in the case our real message channel stream was closed, we should also indicate we are closed
|
|
|
|
|
// (and whoever is using the stream should panic)
|
|
|
|
|
Poll::Ready(None) => Poll::Ready(None),
|
|
|
|
|
|
|
|
|
|
// if there are more messages available, return first one and store the rest
|
|
|
|
|
Poll::Ready(Some(real_messages)) => {
|
|
|
|
|
self.received_buffer = real_messages.into();
|
|
|
|
|
// we MUST HAVE received at least ONE message
|
|
|
|
|
Poll::Ready(Some(StreamMessage::Real(Box::new(
|
|
|
|
|
self.received_buffer.pop_front().unwrap(),
|
|
|
|
|
))))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// if there's nothing, then there's nothing
|
|
|
|
|
Poll::Pending => Poll::Pending,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn poll_next_message(
|
|
|
|
|
mut self: Pin<&mut Self>,
|
|
|
|
|
cx: &mut Context<'_>,
|
|
|
|
|
) -> Poll<Option<StreamMessage>> {
|
|
|
|
|
if self.config.disable_poisson_packet_distribution {
|
|
|
|
|
self.poll_immediate(cx)
|
|
|
|
|
} else {
|
|
|
|
|
self.poll_poisson(cx)
|
|
|
|
|
if ctrl.is_sending_reliable() {
|
|
|
|
|
ctrl.decrease_delay_multiplier();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -506,44 +379,76 @@ where
|
|
|
|
|
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: task::ShutdownListener) {
|
|
|
|
|
debug!("Started OutQueueControl with graceful shutdown support");
|
|
|
|
|
|
|
|
|
|
let mut poisson_delay_timer = PoissonDelayTimer::new(
|
|
|
|
|
self.config.average_message_sending_delay,
|
|
|
|
|
self.sending_rate_controller.clone(),
|
|
|
|
|
self.rng.clone(),
|
|
|
|
|
);
|
|
|
|
|
let poisson_delay_stream = poisson_delay_timer.as_stream();
|
|
|
|
|
tokio::pin!(poisson_delay_stream);
|
|
|
|
|
|
|
|
|
|
while !shutdown.is_shutdown() {
|
|
|
|
|
tokio::select! {
|
|
|
|
|
biased;
|
|
|
|
|
_ = shutdown.recv() => {
|
|
|
|
|
log::trace!("OutQueueControl: Received shutdown");
|
|
|
|
|
}
|
|
|
|
|
next_message = self.next() => match next_message {
|
|
|
|
|
Some(next_message) => {
|
|
|
|
|
self.on_message(next_message).await;
|
|
|
|
|
},
|
|
|
|
|
None => {
|
|
|
|
|
messages = self.real_receiver.next() => {
|
|
|
|
|
if let Some(real_messages) = messages {
|
|
|
|
|
self.received_buffer = real_messages.into();
|
|
|
|
|
} else {
|
|
|
|
|
log::trace!("OutQueueControl: Stopping since channel closed");
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
_ = poisson_delay_stream.next() => {
|
|
|
|
|
self.adjust_current_average_message_sending_delay();
|
|
|
|
|
let msg = match self.received_buffer.pop_front() {
|
|
|
|
|
Some(msg) => StreamMessage::Real(Box::new(msg)),
|
|
|
|
|
None => StreamMessage::Cover
|
|
|
|
|
};
|
|
|
|
|
self.on_message(msg).await;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
assert!(shutdown.is_shutdown_poll());
|
|
|
|
|
log::debug!("OutQueueControl: Exiting");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[cfg(target_arch = "wasm32")]
|
|
|
|
|
pub(super) async fn run(&mut self) {
|
|
|
|
|
debug!("Started OutQueueControl without graceful shutdown support");
|
|
|
|
|
struct PoissonDelayTimer<R> {
|
|
|
|
|
average_message_sending_delay: Duration,
|
|
|
|
|
sending_rate_controller: SharedSendingDelayController,
|
|
|
|
|
rng: R,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
while let Some(next_message) = self.next().await {
|
|
|
|
|
self.on_message(next_message).await;
|
|
|
|
|
impl<R> PoissonDelayTimer<R>
|
|
|
|
|
where
|
|
|
|
|
R: Rng + CryptoRng,
|
|
|
|
|
{
|
|
|
|
|
fn new(
|
|
|
|
|
average_message_sending_delay: Duration,
|
|
|
|
|
sending_rate_controller: SharedSendingDelayController,
|
|
|
|
|
rng: R,
|
|
|
|
|
) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
average_message_sending_delay,
|
|
|
|
|
sending_rate_controller,
|
|
|
|
|
rng,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<R> Stream for OutQueueControl<R>
|
|
|
|
|
where
|
|
|
|
|
R: CryptoRng + Rng + Unpin,
|
|
|
|
|
{
|
|
|
|
|
type Item = StreamMessage;
|
|
|
|
|
fn current_average_message_sending_delay(&self) -> Duration {
|
|
|
|
|
let ctrl = self.sending_rate_controller.lock().unwrap();
|
|
|
|
|
self.average_message_sending_delay * ctrl.current_multiplier()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
|
|
|
self.poll_next_message(cx)
|
|
|
|
|
fn as_stream(&mut self) -> impl futures::Stream<Item = ()> + '_ {
|
|
|
|
|
futures::stream::unfold(self, |out_queue_control| async {
|
|
|
|
|
let avg_delay = out_queue_control.current_average_message_sending_delay();
|
|
|
|
|
let next_poisson_delay = sample_poisson_duration(&mut out_queue_control.rng, avg_delay);
|
|
|
|
|
time::sleep(next_poisson_delay).await;
|
|
|
|
|
Some(((), out_queue_control))
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|