Compare commits

...

2 Commits

Author SHA1 Message Date
Jon Häggblad 1486450b6e WIP 2022-10-07 16:27:44 +02:00
Jędrzej Stuczyński a79107cd9a Introduced disable_main_poisson_packet_distribution to force real_traffic_stream to disable poisson sending 2022-10-06 12:25:38 +02:00
13 changed files with 163 additions and 67 deletions
@@ -37,7 +37,7 @@ impl AcknowledgementListener {
} }
async fn on_ack(&mut self, ack_content: Vec<u8>) { async fn on_ack(&mut self, ack_content: Vec<u8>) {
debug!("Received an ack"); trace!("Received an ack");
let frag_id = match recover_identifier(&self.ack_key, &ack_content) let frag_id = match recover_identifier(&self.ack_key, &ack_content)
.map(FragmentIdentifier::try_from_bytes) .map(FragmentIdentifier::try_from_bytes)
{ {
@@ -60,7 +60,7 @@ impl AcknowledgementListener {
return; return;
} }
trace!("Received {} from the mix network", frag_id); debug!("Received {} from the mix network", frag_id);
self.action_sender self.action_sender
.unbounded_send(Action::new_remove(frag_id)) .unbounded_send(Action::new_remove(frag_id))
@@ -128,7 +128,7 @@ impl ActionController {
fn handle_insert(&mut self, pending_acks: Vec<PendingAcknowledgement>) { fn handle_insert(&mut self, pending_acks: Vec<PendingAcknowledgement>) {
for pending_ack in pending_acks { for pending_ack in pending_acks {
let frag_id = pending_ack.message_chunk.fragment_identifier(); let frag_id = pending_ack.message_chunk.fragment_identifier();
trace!("{} is inserted", frag_id); debug!("{} is inserted", frag_id);
if self if self
.pending_acks_data .pending_acks_data
@@ -141,7 +141,7 @@ impl ActionController {
} }
fn handle_start_timer(&mut self, frag_id: FragmentIdentifier) { fn handle_start_timer(&mut self, frag_id: FragmentIdentifier) {
trace!("{} is starting its timer", frag_id); debug!("{} is starting its timer", frag_id);
if let Some((pending_ack_data, queue_key)) = self.pending_acks_data.get_mut(&frag_id) { if let Some((pending_ack_data, queue_key)) = self.pending_acks_data.get_mut(&frag_id) {
if queue_key.is_some() { if queue_key.is_some() {
@@ -164,7 +164,7 @@ impl ActionController {
} }
fn handle_remove(&mut self, frag_id: FragmentIdentifier) { fn handle_remove(&mut self, frag_id: FragmentIdentifier) {
trace!("{} is getting removed", frag_id); debug!("{} is getting removed", frag_id);
match self.pending_acks_data.remove(&frag_id) { match self.pending_acks_data.remove(&frag_id) {
None => { None => {
@@ -195,7 +195,7 @@ impl ActionController {
// initiated basically as a first step of retransmission. At first data has its delay updated // initiated basically as a first step of retransmission. At first data has its delay updated
// (as new sphinx packet was created with new expected delivery time) // (as new sphinx packet was created with new expected delivery time)
fn handle_update_delay(&mut self, frag_id: FragmentIdentifier, delay: SphinxDelay) { fn handle_update_delay(&mut self, frag_id: FragmentIdentifier, delay: SphinxDelay) {
trace!("{} is updating its delay", frag_id); debug!("{} is updating its delay", frag_id);
// TODO: is it possible to solve this without either locking or temporarily removing the value? // TODO: is it possible to solve this without either locking or temporarily removing the value?
if let Some((pending_ack_data, queue_key)) = self.pending_acks_data.remove(&frag_id) { if let Some((pending_ack_data, queue_key)) = self.pending_acks_data.remove(&frag_id) {
// this Action is triggered by `RetransmissionRequestListener` which held the other potential // this Action is triggered by `RetransmissionRequestListener` which held the other potential
@@ -221,7 +221,7 @@ impl ActionController {
// about it. Perhaps just reschedule it at later point? // about it. Perhaps just reschedule it at later point?
let frag_id = expired_ack.into_inner(); let frag_id = expired_ack.into_inner();
trace!("{} has expired", frag_id); debug!("{} has expired", frag_id);
if let Some((pending_ack_data, queue_key)) = self.pending_acks_data.get_mut(&frag_id) { if let Some((pending_ack_data, queue_key)) = self.pending_acks_data.get_mut(&frag_id) {
if queue_key.is_none() { if queue_key.is_none() {
@@ -104,6 +104,7 @@ where
content: Vec<u8>, content: Vec<u8>,
with_reply_surb: bool, with_reply_surb: bool,
) -> Option<Vec<RealMessage>> { ) -> Option<Vec<RealMessage>> {
log::debug!("handle_fresh_message");
let topology_permit = self.topology_access.get_read_permit().await; let topology_permit = self.topology_access.get_read_permit().await;
let topology = match topology_permit let topology = match topology_permit
.try_get_valid_topology_ref(&self.ack_recipient, Some(&recipient)) .try_get_valid_topology_ref(&self.ack_recipient, Some(&recipient))
@@ -160,6 +161,7 @@ where
} }
async fn on_input_message(&mut self, msg: InputMessage) { async fn on_input_message(&mut self, msg: InputMessage) {
log::debug!("on_input_message");
let real_messages = match msg { let real_messages = match msg {
InputMessage::Fresh { InputMessage::Fresh {
recipient, recipient,
@@ -178,6 +178,9 @@ where
config.average_ack_delay, config.average_ack_delay,
); );
let message_preparer =
message_preparer.with_packet_size(nymsphinx::params::PacketSize::ExtendedPacket);
// will listen for any acks coming from the network // will listen for any acks coming from the network
let acknowledgement_listener = AcknowledgementListener::new( let acknowledgement_listener = AcknowledgementListener::new(
Arc::clone(&ack_key), Arc::clone(&ack_key),
@@ -50,9 +50,15 @@ pub struct Config {
/// Average delay an acknowledgement packet is going to get delayed at a single mixnode. /// Average delay an acknowledgement packet is going to get delayed at a single mixnode.
average_ack_delay_duration: Duration, average_ack_delay_duration: Duration,
/// Controls whether the main packet stream constantly produces packets according to the predefined
/// poisson distribution.
disable_main_poisson_packet_distribution: bool,
} }
impl Config { impl Config {
// TODO: change the config into a builder
#[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
ack_key: Arc<AckKey>, ack_key: Arc<AckKey>,
ack_wait_multiplier: f64, ack_wait_multiplier: f64,
@@ -60,6 +66,7 @@ impl Config {
average_ack_delay_duration: Duration, average_ack_delay_duration: Duration,
average_message_sending_delay: Duration, average_message_sending_delay: Duration,
average_packet_delay_duration: Duration, average_packet_delay_duration: Duration,
disable_main_poisson_packet_distribution: bool,
self_recipient: Recipient, self_recipient: Recipient,
) -> Self { ) -> Self {
Config { Config {
@@ -70,6 +77,7 @@ impl Config {
average_message_sending_delay, average_message_sending_delay,
average_packet_delay_duration, average_packet_delay_duration,
average_ack_delay_duration, average_ack_delay_duration,
disable_main_poisson_packet_distribution,
} }
} }
} }
@@ -128,6 +136,7 @@ impl RealMessagesController<OsRng> {
config.average_ack_delay_duration, config.average_ack_delay_duration,
config.average_packet_delay_duration, config.average_packet_delay_duration,
config.average_message_sending_delay, config.average_message_sending_delay,
config.disable_main_poisson_packet_distribution,
); );
let out_queue_control = OutQueueControl::new( let out_queue_control = OutQueueControl::new(
@@ -32,6 +32,10 @@ pub(crate) struct Config {
/// Average delay between sending subsequent packets. /// Average delay between sending subsequent packets.
average_message_sending_delay: Duration, average_message_sending_delay: Duration,
/// Controls whether the stream constantly produces packets according to the predefined
/// poisson distribution.
disable_poisson_packet_distribution: bool,
} }
impl Config { impl Config {
@@ -39,11 +43,13 @@ impl Config {
average_ack_delay: Duration, average_ack_delay: Duration,
average_packet_delay: Duration, average_packet_delay: Duration,
average_message_sending_delay: Duration, average_message_sending_delay: Duration,
disable_poisson_packet_distribution: bool,
) -> Self { ) -> Self {
Config { Config {
average_ack_delay, average_ack_delay,
average_packet_delay, average_packet_delay,
average_message_sending_delay, average_message_sending_delay,
disable_poisson_packet_distribution,
} }
} }
} }
@@ -63,7 +69,7 @@ where
/// Internal state, determined by `average_message_sending_delay`, /// Internal state, determined by `average_message_sending_delay`,
/// used to keep track of when a next packet should be sent out. /// used to keep track of when a next packet should be sent out.
next_delay: Pin<Box<time::Sleep>>, next_delay: Option<Pin<Box<time::Sleep>>>,
/// Channel used for sending prepared sphinx packets to `MixTrafficController` that sends them /// Channel used for sending prepared sphinx packets to `MixTrafficController` that sends them
/// out to the network without any further delays. /// out to the network without any further delays.
@@ -113,55 +119,6 @@ pub(crate) enum StreamMessage {
Real(Box<RealMessage>), Real(Box<RealMessage>),
} }
impl<R> Stream for OutQueueControl<R>
where
R: CryptoRng + Rng + Unpin,
{
type Item = StreamMessage;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// it is not yet time to return a message
if self.next_delay.as_mut().poll(cx).is_pending() {
return Poll::Pending;
};
// we know it's time to send a message, so let's prepare delay for the next one
// Get the `now` by looking at the current `delay` deadline
let avg_delay = self.config.average_message_sending_delay;
let now = self.next_delay.deadline();
let next_poisson_delay = sample_poisson_duration(&mut self.rng, avg_delay);
// The next interval value is `next_poisson_delay` after the one that just
// yielded.
let next = now + next_poisson_delay;
self.next_delay.as_mut().reset(next);
// check if we have anything immediately available
if let Some(real_available) = self.received_buffer.pop_front() {
return Poll::Ready(Some(StreamMessage::Real(Box::new(real_available))));
}
// decide what kind of message to send
match Pin::new(&mut self.real_receiver).poll_next(cx) {
// in the case our real message channel stream was closed, we should also indicate we are closed
// (and whoever is using the stream should panic)
Poll::Ready(None) => Poll::Ready(None),
// if there are more messages available, return first one and store the rest
Poll::Ready(Some(real_messages)) => {
self.received_buffer = real_messages.into();
// we MUST HAVE received at least ONE message
Poll::Ready(Some(StreamMessage::Real(Box::new(
self.received_buffer.pop_front().unwrap(),
))))
}
// otherwise construct a dummy one
Poll::Pending => Poll::Ready(Some(StreamMessage::Cover)),
}
}
}
impl<R> OutQueueControl<R> impl<R> OutQueueControl<R>
where where
R: CryptoRng + Rng + Unpin, R: CryptoRng + Rng + Unpin,
@@ -184,7 +141,7 @@ where
config, config,
ack_key, ack_key,
sent_notifier, sent_notifier,
next_delay: Box::pin(time::sleep(Default::default())), next_delay: None,
mix_tx, mix_tx,
real_receiver, real_receiver,
our_full_destination, our_full_destination,
@@ -265,12 +222,6 @@ where
// Send messages at certain rate and if no real traffic is available, send cover message. // Send messages at certain rate and if no real traffic is available, send cover message.
async fn run_normal_out_queue(&mut self) { async fn run_normal_out_queue(&mut self) {
// we should set initial delay only when we actually start the stream
self.next_delay = Box::pin(time::sleep(sample_poisson_duration(
&mut self.rng,
self.config.average_message_sending_delay,
)));
let mut shutdown = self.shutdown.clone(); let mut shutdown = self.shutdown.clone();
while !shutdown.is_shutdown() { while !shutdown.is_shutdown() {
tokio::select! { tokio::select! {
@@ -297,4 +248,110 @@ where
debug!("Starting out queue controller..."); debug!("Starting out queue controller...");
self.run_normal_out_queue().await self.run_normal_out_queue().await
} }
fn poll_poisson(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
if let Some(ref mut next_delay) = &mut self.next_delay {
// it is not yet time to return a message
if next_delay.as_mut().poll(cx).is_pending() {
return Poll::Pending;
};
// we know it's time to send a message, so let's prepare delay for the next one
// Get the `now` by looking at the current `delay` deadline
let avg_delay = self.config.average_message_sending_delay;
let now = next_delay.deadline();
let next_poisson_delay = sample_poisson_duration(&mut self.rng, avg_delay);
// The next interval value is `next_poisson_delay` after the one that just
// yielded.
let next = now + next_poisson_delay;
next_delay.as_mut().reset(next);
// check if we have anything immediately available
if let Some(real_available) = self.received_buffer.pop_front() {
return Poll::Ready(Some(StreamMessage::Real(Box::new(real_available))));
}
// decide what kind of message to send
match Pin::new(&mut self.real_receiver).poll_next(cx) {
// in the case our real message channel stream was closed, we should also indicate we are closed
// (and whoever is using the stream should panic)
Poll::Ready(None) => Poll::Ready(None),
// if there are more messages available, return first one and store the rest
Poll::Ready(Some(real_messages)) => {
self.received_buffer = real_messages.into();
// we MUST HAVE received at least ONE message
Poll::Ready(Some(StreamMessage::Real(Box::new(
self.received_buffer.pop_front().unwrap(),
))))
}
// otherwise construct a dummy one
Poll::Pending => Poll::Ready(Some(StreamMessage::Cover)),
}
} else {
// we never set an initial delay - let's do it now
cx.waker().wake_by_ref();
self.next_delay = Some(Box::pin(time::sleep(sample_poisson_duration(
&mut self.rng,
self.config.average_message_sending_delay,
))));
Poll::Pending
}
}
fn poll_immediate(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
// check if we have anything immediately available
if let Some(real_available) = self.received_buffer.pop_front() {
// if there are more messages immediately available, notify the runtime
// because we should be polled again
if !self.received_buffer.is_empty() {
cx.waker().wake_by_ref()
}
return Poll::Ready(Some(StreamMessage::Real(Box::new(real_available))));
}
match Pin::new(&mut self.real_receiver).poll_next(cx) {
// in the case our real message channel stream was closed, we should also indicate we are closed
// (and whoever is using the stream should panic)
Poll::Ready(None) => Poll::Ready(None),
// if there are more messages available, return first one and store the rest
Poll::Ready(Some(real_messages)) => {
self.received_buffer = real_messages.into();
// we MUST HAVE received at least ONE message
Poll::Ready(Some(StreamMessage::Real(Box::new(
self.received_buffer.pop_front().unwrap(),
))))
}
// if there's nothing, then there's nothing
Poll::Pending => Poll::Pending,
}
}
fn poll_next_message(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<StreamMessage>> {
if self.config.disable_poisson_packet_distribution {
self.poll_immediate(cx)
} else {
self.poll_poisson(cx)
}
}
}
impl<R> Stream for OutQueueControl<R>
where
R: CryptoRng + Rng + Unpin,
{
type Item = StreamMessage;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_next_message(cx)
}
} }
@@ -73,6 +73,11 @@ impl ReceivedMessagesBufferInner {
if self.recently_reconstructed.contains(&fragment.id()) { if self.recently_reconstructed.contains(&fragment.id()) {
debug!("Received a chunk of already re-assembled message ({:?})! It probably got here because the ack got lost", fragment.id()); debug!("Received a chunk of already re-assembled message ({:?})! It probably got here because the ack got lost", fragment.id());
return None; return None;
} else {
debug!(
"Received a chunk of message ({:?}) for the first time",
fragment.id()
);
} }
// if we returned an error the underlying message is malformed in some way // if we returned an error the underlying message is malformed in some way
@@ -204,7 +209,7 @@ impl ReceivedMessagesBuffer {
} }
async fn handle_new_received(&mut self, msgs: Vec<Vec<u8>>) { async fn handle_new_received(&mut self, msgs: Vec<Vec<u8>>) {
debug!( trace!(
"Processing {:?} new message that might get added to the buffer!", "Processing {:?} new message that might get added to the buffer!",
msgs.len() msgs.len()
); );
+12 -1
View File
@@ -14,8 +14,10 @@ pub const MISSING_VALUE: &str = "MISSING VALUE";
// 'DEBUG' // 'DEBUG'
const DEFAULT_ACK_WAIT_MULTIPLIER: f64 = 1.5; const DEFAULT_ACK_WAIT_MULTIPLIER: f64 = 1.5;
//const DEFAULT_ACK_WAIT_MULTIPLIER: f64 = 10.0;
const DEFAULT_ACK_WAIT_ADDITION: Duration = Duration::from_millis(1_500); //const DEFAULT_ACK_WAIT_ADDITION: Duration = Duration::from_millis(1_500);
const DEFAULT_ACK_WAIT_ADDITION: Duration = Duration::from_millis(10_000);
const DEFAULT_LOOP_COVER_STREAM_AVERAGE_DELAY: Duration = Duration::from_millis(200); const DEFAULT_LOOP_COVER_STREAM_AVERAGE_DELAY: Duration = Duration::from_millis(200);
const DEFAULT_MESSAGE_STREAM_AVERAGE_DELAY: Duration = Duration::from_millis(20); const DEFAULT_MESSAGE_STREAM_AVERAGE_DELAY: Duration = Duration::from_millis(20);
const DEFAULT_AVERAGE_PACKET_DELAY: Duration = Duration::from_millis(50); const DEFAULT_AVERAGE_PACKET_DELAY: Duration = Duration::from_millis(50);
@@ -236,6 +238,10 @@ impl<T: NymConfig> Config<T> {
self.debug.topology_resolution_timeout self.debug.topology_resolution_timeout
} }
pub fn get_disabled_main_poisson_packet_distribution(&self) -> bool {
self.debug.disable_main_poisson_packet_distribution
}
pub fn get_version(&self) -> &str { pub fn get_version(&self) -> &str {
&self.client.version &self.client.version
} }
@@ -445,6 +451,10 @@ pub struct Debug {
/// did not reach its destination. /// did not reach its destination.
#[serde(with = "humantime_serde")] #[serde(with = "humantime_serde")]
topology_resolution_timeout: Duration, topology_resolution_timeout: Duration,
/// Controls whether the main packet stream constantly produces packets according to the predefined
/// poisson distribution.
disable_main_poisson_packet_distribution: bool,
} }
impl Default for Debug { impl Default for Debug {
@@ -459,6 +469,7 @@ impl Default for Debug {
gateway_response_timeout: DEFAULT_GATEWAY_RESPONSE_TIMEOUT, gateway_response_timeout: DEFAULT_GATEWAY_RESPONSE_TIMEOUT,
topology_refresh_rate: DEFAULT_TOPOLOGY_REFRESH_RATE, topology_refresh_rate: DEFAULT_TOPOLOGY_REFRESH_RATE,
topology_resolution_timeout: DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT, topology_resolution_timeout: DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT,
disable_main_poisson_packet_distribution: false,
} }
} }
} }
+3
View File
@@ -121,6 +121,9 @@ impl NymClient {
self.config.get_base().get_average_ack_delay(), self.config.get_base().get_average_ack_delay(),
self.config.get_base().get_message_sending_average_delay(), self.config.get_base().get_message_sending_average_delay(),
self.config.get_base().get_average_packet_delay(), self.config.get_base().get_average_packet_delay(),
self.config
.get_base()
.get_disabled_main_poisson_packet_distribution(),
self.as_mix_recipient(), self.as_mix_recipient(),
); );
+3
View File
@@ -122,6 +122,9 @@ impl NymClient {
self.config.get_base().get_average_ack_delay(), self.config.get_base().get_average_ack_delay(),
self.config.get_base().get_message_sending_average_delay(), self.config.get_base().get_message_sending_average_delay(),
self.config.get_base().get_average_packet_delay(), self.config.get_base().get_average_packet_delay(),
self.config
.get_base()
.get_disabled_main_poisson_packet_distribution(),
self.as_mix_recipient(), self.as_mix_recipient(),
); );
+1
View File
@@ -52,5 +52,6 @@ fn setup_logging() {
.filter_module("want", log::LevelFilter::Warn) .filter_module("want", log::LevelFilter::Warn)
.filter_module("tungstenite", log::LevelFilter::Warn) .filter_module("tungstenite", log::LevelFilter::Warn)
.filter_module("tokio_tungstenite", log::LevelFilter::Warn) .filter_module("tokio_tungstenite", log::LevelFilter::Warn)
.filter_module("sled", log::LevelFilter::Warn)
.init(); .init();
} }
@@ -63,7 +63,7 @@ impl PacketRouter {
} else if received_packet.len() } else if received_packet.len()
== PacketSize::ExtendedPacket.plaintext_size() - ack_overhead == PacketSize::ExtendedPacket.plaintext_size() - ack_overhead
{ {
warn!("received extended packet? Did not expect this..."); //warn!("received extended packet? Did not expect this...");
received_messages.push(received_packet); received_messages.push(received_packet);
} else { } else {
// this can happen if other clients are not padding their messages // this can happen if other clients are not padding their messages
+2
View File
@@ -271,6 +271,8 @@ where
// including set of delays // including set of delays
let delays = delays::generate_from_average_duration(route.len(), self.average_packet_delay); let delays = delays::generate_from_average_duration(route.len(), self.average_packet_delay);
//let delays = vec![Delay::new_from_nanos(0); route.len()];
//assert!(delays_pre.len() == delays.len());
// create the actual sphinx packet here. With valid route and correct payload size, // create the actual sphinx packet here. With valid route and correct payload size,
// there's absolutely no reason for this call to fail. // there's absolutely no reason for this call to fail.