Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1486450b6e | |||
| a79107cd9a |
+2
-2
@@ -37,7 +37,7 @@ impl AcknowledgementListener {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn on_ack(&mut self, ack_content: Vec<u8>) {
|
async fn on_ack(&mut self, ack_content: Vec<u8>) {
|
||||||
debug!("Received an ack");
|
trace!("Received an ack");
|
||||||
let frag_id = match recover_identifier(&self.ack_key, &ack_content)
|
let frag_id = match recover_identifier(&self.ack_key, &ack_content)
|
||||||
.map(FragmentIdentifier::try_from_bytes)
|
.map(FragmentIdentifier::try_from_bytes)
|
||||||
{
|
{
|
||||||
@@ -60,7 +60,7 @@ impl AcknowledgementListener {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!("Received {} from the mix network", frag_id);
|
debug!("Received {} from the mix network", frag_id);
|
||||||
|
|
||||||
self.action_sender
|
self.action_sender
|
||||||
.unbounded_send(Action::new_remove(frag_id))
|
.unbounded_send(Action::new_remove(frag_id))
|
||||||
|
|||||||
+5
-5
@@ -128,7 +128,7 @@ impl ActionController {
|
|||||||
fn handle_insert(&mut self, pending_acks: Vec<PendingAcknowledgement>) {
|
fn handle_insert(&mut self, pending_acks: Vec<PendingAcknowledgement>) {
|
||||||
for pending_ack in pending_acks {
|
for pending_ack in pending_acks {
|
||||||
let frag_id = pending_ack.message_chunk.fragment_identifier();
|
let frag_id = pending_ack.message_chunk.fragment_identifier();
|
||||||
trace!("{} is inserted", frag_id);
|
debug!("{} is inserted", frag_id);
|
||||||
|
|
||||||
if self
|
if self
|
||||||
.pending_acks_data
|
.pending_acks_data
|
||||||
@@ -141,7 +141,7 @@ impl ActionController {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn handle_start_timer(&mut self, frag_id: FragmentIdentifier) {
|
fn handle_start_timer(&mut self, frag_id: FragmentIdentifier) {
|
||||||
trace!("{} is starting its timer", frag_id);
|
debug!("{} is starting its timer", frag_id);
|
||||||
|
|
||||||
if let Some((pending_ack_data, queue_key)) = self.pending_acks_data.get_mut(&frag_id) {
|
if let Some((pending_ack_data, queue_key)) = self.pending_acks_data.get_mut(&frag_id) {
|
||||||
if queue_key.is_some() {
|
if queue_key.is_some() {
|
||||||
@@ -164,7 +164,7 @@ impl ActionController {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn handle_remove(&mut self, frag_id: FragmentIdentifier) {
|
fn handle_remove(&mut self, frag_id: FragmentIdentifier) {
|
||||||
trace!("{} is getting removed", frag_id);
|
debug!("{} is getting removed", frag_id);
|
||||||
|
|
||||||
match self.pending_acks_data.remove(&frag_id) {
|
match self.pending_acks_data.remove(&frag_id) {
|
||||||
None => {
|
None => {
|
||||||
@@ -195,7 +195,7 @@ impl ActionController {
|
|||||||
// initiated basically as a first step of retransmission. At first data has its delay updated
|
// initiated basically as a first step of retransmission. At first data has its delay updated
|
||||||
// (as new sphinx packet was created with new expected delivery time)
|
// (as new sphinx packet was created with new expected delivery time)
|
||||||
fn handle_update_delay(&mut self, frag_id: FragmentIdentifier, delay: SphinxDelay) {
|
fn handle_update_delay(&mut self, frag_id: FragmentIdentifier, delay: SphinxDelay) {
|
||||||
trace!("{} is updating its delay", frag_id);
|
debug!("{} is updating its delay", frag_id);
|
||||||
// TODO: is it possible to solve this without either locking or temporarily removing the value?
|
// TODO: is it possible to solve this without either locking or temporarily removing the value?
|
||||||
if let Some((pending_ack_data, queue_key)) = self.pending_acks_data.remove(&frag_id) {
|
if let Some((pending_ack_data, queue_key)) = self.pending_acks_data.remove(&frag_id) {
|
||||||
// this Action is triggered by `RetransmissionRequestListener` which held the other potential
|
// this Action is triggered by `RetransmissionRequestListener` which held the other potential
|
||||||
@@ -221,7 +221,7 @@ impl ActionController {
|
|||||||
// about it. Perhaps just reschedule it at later point?
|
// about it. Perhaps just reschedule it at later point?
|
||||||
let frag_id = expired_ack.into_inner();
|
let frag_id = expired_ack.into_inner();
|
||||||
|
|
||||||
trace!("{} has expired", frag_id);
|
debug!("{} has expired", frag_id);
|
||||||
|
|
||||||
if let Some((pending_ack_data, queue_key)) = self.pending_acks_data.get_mut(&frag_id) {
|
if let Some((pending_ack_data, queue_key)) = self.pending_acks_data.get_mut(&frag_id) {
|
||||||
if queue_key.is_none() {
|
if queue_key.is_none() {
|
||||||
|
|||||||
+2
@@ -104,6 +104,7 @@ where
|
|||||||
content: Vec<u8>,
|
content: Vec<u8>,
|
||||||
with_reply_surb: bool,
|
with_reply_surb: bool,
|
||||||
) -> Option<Vec<RealMessage>> {
|
) -> Option<Vec<RealMessage>> {
|
||||||
|
log::debug!("handle_fresh_message");
|
||||||
let topology_permit = self.topology_access.get_read_permit().await;
|
let topology_permit = self.topology_access.get_read_permit().await;
|
||||||
let topology = match topology_permit
|
let topology = match topology_permit
|
||||||
.try_get_valid_topology_ref(&self.ack_recipient, Some(&recipient))
|
.try_get_valid_topology_ref(&self.ack_recipient, Some(&recipient))
|
||||||
@@ -160,6 +161,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn on_input_message(&mut self, msg: InputMessage) {
|
async fn on_input_message(&mut self, msg: InputMessage) {
|
||||||
|
log::debug!("on_input_message");
|
||||||
let real_messages = match msg {
|
let real_messages = match msg {
|
||||||
InputMessage::Fresh {
|
InputMessage::Fresh {
|
||||||
recipient,
|
recipient,
|
||||||
|
|||||||
@@ -178,6 +178,9 @@ where
|
|||||||
config.average_ack_delay,
|
config.average_ack_delay,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let message_preparer =
|
||||||
|
message_preparer.with_packet_size(nymsphinx::params::PacketSize::ExtendedPacket);
|
||||||
|
|
||||||
// will listen for any acks coming from the network
|
// will listen for any acks coming from the network
|
||||||
let acknowledgement_listener = AcknowledgementListener::new(
|
let acknowledgement_listener = AcknowledgementListener::new(
|
||||||
Arc::clone(&ack_key),
|
Arc::clone(&ack_key),
|
||||||
|
|||||||
@@ -50,9 +50,15 @@ pub struct Config {
|
|||||||
|
|
||||||
/// Average delay an acknowledgement packet is going to get delayed at a single mixnode.
|
/// Average delay an acknowledgement packet is going to get delayed at a single mixnode.
|
||||||
average_ack_delay_duration: Duration,
|
average_ack_delay_duration: Duration,
|
||||||
|
|
||||||
|
/// Controls whether the main packet stream constantly produces packets according to the predefined
|
||||||
|
/// poisson distribution.
|
||||||
|
disable_main_poisson_packet_distribution: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Config {
|
impl Config {
|
||||||
|
// TODO: change the config into a builder
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub fn new(
|
pub fn new(
|
||||||
ack_key: Arc<AckKey>,
|
ack_key: Arc<AckKey>,
|
||||||
ack_wait_multiplier: f64,
|
ack_wait_multiplier: f64,
|
||||||
@@ -60,6 +66,7 @@ impl Config {
|
|||||||
average_ack_delay_duration: Duration,
|
average_ack_delay_duration: Duration,
|
||||||
average_message_sending_delay: Duration,
|
average_message_sending_delay: Duration,
|
||||||
average_packet_delay_duration: Duration,
|
average_packet_delay_duration: Duration,
|
||||||
|
disable_main_poisson_packet_distribution: bool,
|
||||||
self_recipient: Recipient,
|
self_recipient: Recipient,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Config {
|
Config {
|
||||||
@@ -70,6 +77,7 @@ impl Config {
|
|||||||
average_message_sending_delay,
|
average_message_sending_delay,
|
||||||
average_packet_delay_duration,
|
average_packet_delay_duration,
|
||||||
average_ack_delay_duration,
|
average_ack_delay_duration,
|
||||||
|
disable_main_poisson_packet_distribution,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -128,6 +136,7 @@ impl RealMessagesController<OsRng> {
|
|||||||
config.average_ack_delay_duration,
|
config.average_ack_delay_duration,
|
||||||
config.average_packet_delay_duration,
|
config.average_packet_delay_duration,
|
||||||
config.average_message_sending_delay,
|
config.average_message_sending_delay,
|
||||||
|
config.disable_main_poisson_packet_distribution,
|
||||||
);
|
);
|
||||||
|
|
||||||
let out_queue_control = OutQueueControl::new(
|
let out_queue_control = OutQueueControl::new(
|
||||||
|
|||||||
@@ -32,6 +32,10 @@ pub(crate) struct Config {
|
|||||||
|
|
||||||
/// Average delay between sending subsequent packets.
|
/// Average delay between sending subsequent packets.
|
||||||
average_message_sending_delay: Duration,
|
average_message_sending_delay: Duration,
|
||||||
|
|
||||||
|
/// Controls whether the stream constantly produces packets according to the predefined
|
||||||
|
/// poisson distribution.
|
||||||
|
disable_poisson_packet_distribution: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Config {
|
impl Config {
|
||||||
@@ -39,11 +43,13 @@ impl Config {
|
|||||||
average_ack_delay: Duration,
|
average_ack_delay: Duration,
|
||||||
average_packet_delay: Duration,
|
average_packet_delay: Duration,
|
||||||
average_message_sending_delay: Duration,
|
average_message_sending_delay: Duration,
|
||||||
|
disable_poisson_packet_distribution: bool,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Config {
|
Config {
|
||||||
average_ack_delay,
|
average_ack_delay,
|
||||||
average_packet_delay,
|
average_packet_delay,
|
||||||
average_message_sending_delay,
|
average_message_sending_delay,
|
||||||
|
disable_poisson_packet_distribution,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -63,7 +69,7 @@ where
|
|||||||
|
|
||||||
/// Internal state, determined by `average_message_sending_delay`,
|
/// Internal state, determined by `average_message_sending_delay`,
|
||||||
/// used to keep track of when a next packet should be sent out.
|
/// used to keep track of when a next packet should be sent out.
|
||||||
next_delay: Pin<Box<time::Sleep>>,
|
next_delay: Option<Pin<Box<time::Sleep>>>,
|
||||||
|
|
||||||
/// Channel used for sending prepared sphinx packets to `MixTrafficController` that sends them
|
/// Channel used for sending prepared sphinx packets to `MixTrafficController` that sends them
|
||||||
/// out to the network without any further delays.
|
/// out to the network without any further delays.
|
||||||
@@ -113,55 +119,6 @@ pub(crate) enum StreamMessage {
|
|||||||
Real(Box<RealMessage>),
|
Real(Box<RealMessage>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R> Stream for OutQueueControl<R>
|
|
||||||
where
|
|
||||||
R: CryptoRng + Rng + Unpin,
|
|
||||||
{
|
|
||||||
type Item = StreamMessage;
|
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
||||||
// it is not yet time to return a message
|
|
||||||
if self.next_delay.as_mut().poll(cx).is_pending() {
|
|
||||||
return Poll::Pending;
|
|
||||||
};
|
|
||||||
|
|
||||||
// we know it's time to send a message, so let's prepare delay for the next one
|
|
||||||
// Get the `now` by looking at the current `delay` deadline
|
|
||||||
let avg_delay = self.config.average_message_sending_delay;
|
|
||||||
let now = self.next_delay.deadline();
|
|
||||||
let next_poisson_delay = sample_poisson_duration(&mut self.rng, avg_delay);
|
|
||||||
|
|
||||||
// The next interval value is `next_poisson_delay` after the one that just
|
|
||||||
// yielded.
|
|
||||||
let next = now + next_poisson_delay;
|
|
||||||
self.next_delay.as_mut().reset(next);
|
|
||||||
|
|
||||||
// check if we have anything immediately available
|
|
||||||
if let Some(real_available) = self.received_buffer.pop_front() {
|
|
||||||
return Poll::Ready(Some(StreamMessage::Real(Box::new(real_available))));
|
|
||||||
}
|
|
||||||
|
|
||||||
// decide what kind of message to send
|
|
||||||
match Pin::new(&mut self.real_receiver).poll_next(cx) {
|
|
||||||
// in the case our real message channel stream was closed, we should also indicate we are closed
|
|
||||||
// (and whoever is using the stream should panic)
|
|
||||||
Poll::Ready(None) => Poll::Ready(None),
|
|
||||||
|
|
||||||
// if there are more messages available, return first one and store the rest
|
|
||||||
Poll::Ready(Some(real_messages)) => {
|
|
||||||
self.received_buffer = real_messages.into();
|
|
||||||
// we MUST HAVE received at least ONE message
|
|
||||||
Poll::Ready(Some(StreamMessage::Real(Box::new(
|
|
||||||
self.received_buffer.pop_front().unwrap(),
|
|
||||||
))))
|
|
||||||
}
|
|
||||||
|
|
||||||
// otherwise construct a dummy one
|
|
||||||
Poll::Pending => Poll::Ready(Some(StreamMessage::Cover)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<R> OutQueueControl<R>
|
impl<R> OutQueueControl<R>
|
||||||
where
|
where
|
||||||
R: CryptoRng + Rng + Unpin,
|
R: CryptoRng + Rng + Unpin,
|
||||||
@@ -184,7 +141,7 @@ where
|
|||||||
config,
|
config,
|
||||||
ack_key,
|
ack_key,
|
||||||
sent_notifier,
|
sent_notifier,
|
||||||
next_delay: Box::pin(time::sleep(Default::default())),
|
next_delay: None,
|
||||||
mix_tx,
|
mix_tx,
|
||||||
real_receiver,
|
real_receiver,
|
||||||
our_full_destination,
|
our_full_destination,
|
||||||
@@ -265,12 +222,6 @@ where
|
|||||||
|
|
||||||
// Send messages at certain rate and if no real traffic is available, send cover message.
|
// Send messages at certain rate and if no real traffic is available, send cover message.
|
||||||
async fn run_normal_out_queue(&mut self) {
|
async fn run_normal_out_queue(&mut self) {
|
||||||
// we should set initial delay only when we actually start the stream
|
|
||||||
self.next_delay = Box::pin(time::sleep(sample_poisson_duration(
|
|
||||||
&mut self.rng,
|
|
||||||
self.config.average_message_sending_delay,
|
|
||||||
)));
|
|
||||||
|
|
||||||
let mut shutdown = self.shutdown.clone();
|
let mut shutdown = self.shutdown.clone();
|
||||||
while !shutdown.is_shutdown() {
|
while !shutdown.is_shutdown() {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
@@ -297,4 +248,110 @@ where
|
|||||||
debug!("Starting out queue controller...");
|
debug!("Starting out queue controller...");
|
||||||
self.run_normal_out_queue().await
|
self.run_normal_out_queue().await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn poll_poisson(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
|
||||||
|
if let Some(ref mut next_delay) = &mut self.next_delay {
|
||||||
|
// it is not yet time to return a message
|
||||||
|
if next_delay.as_mut().poll(cx).is_pending() {
|
||||||
|
return Poll::Pending;
|
||||||
|
};
|
||||||
|
|
||||||
|
// we know it's time to send a message, so let's prepare delay for the next one
|
||||||
|
// Get the `now` by looking at the current `delay` deadline
|
||||||
|
let avg_delay = self.config.average_message_sending_delay;
|
||||||
|
let now = next_delay.deadline();
|
||||||
|
let next_poisson_delay = sample_poisson_duration(&mut self.rng, avg_delay);
|
||||||
|
|
||||||
|
// The next interval value is `next_poisson_delay` after the one that just
|
||||||
|
// yielded.
|
||||||
|
let next = now + next_poisson_delay;
|
||||||
|
next_delay.as_mut().reset(next);
|
||||||
|
|
||||||
|
// check if we have anything immediately available
|
||||||
|
if let Some(real_available) = self.received_buffer.pop_front() {
|
||||||
|
return Poll::Ready(Some(StreamMessage::Real(Box::new(real_available))));
|
||||||
|
}
|
||||||
|
|
||||||
|
// decide what kind of message to send
|
||||||
|
match Pin::new(&mut self.real_receiver).poll_next(cx) {
|
||||||
|
// in the case our real message channel stream was closed, we should also indicate we are closed
|
||||||
|
// (and whoever is using the stream should panic)
|
||||||
|
Poll::Ready(None) => Poll::Ready(None),
|
||||||
|
|
||||||
|
// if there are more messages available, return first one and store the rest
|
||||||
|
Poll::Ready(Some(real_messages)) => {
|
||||||
|
self.received_buffer = real_messages.into();
|
||||||
|
// we MUST HAVE received at least ONE message
|
||||||
|
Poll::Ready(Some(StreamMessage::Real(Box::new(
|
||||||
|
self.received_buffer.pop_front().unwrap(),
|
||||||
|
))))
|
||||||
|
}
|
||||||
|
|
||||||
|
// otherwise construct a dummy one
|
||||||
|
Poll::Pending => Poll::Ready(Some(StreamMessage::Cover)),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// we never set an initial delay - let's do it now
|
||||||
|
cx.waker().wake_by_ref();
|
||||||
|
|
||||||
|
self.next_delay = Some(Box::pin(time::sleep(sample_poisson_duration(
|
||||||
|
&mut self.rng,
|
||||||
|
self.config.average_message_sending_delay,
|
||||||
|
))));
|
||||||
|
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_immediate(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
|
||||||
|
// check if we have anything immediately available
|
||||||
|
if let Some(real_available) = self.received_buffer.pop_front() {
|
||||||
|
// if there are more messages immediately available, notify the runtime
|
||||||
|
// because we should be polled again
|
||||||
|
if !self.received_buffer.is_empty() {
|
||||||
|
cx.waker().wake_by_ref()
|
||||||
|
}
|
||||||
|
return Poll::Ready(Some(StreamMessage::Real(Box::new(real_available))));
|
||||||
|
}
|
||||||
|
|
||||||
|
match Pin::new(&mut self.real_receiver).poll_next(cx) {
|
||||||
|
// in the case our real message channel stream was closed, we should also indicate we are closed
|
||||||
|
// (and whoever is using the stream should panic)
|
||||||
|
Poll::Ready(None) => Poll::Ready(None),
|
||||||
|
|
||||||
|
// if there are more messages available, return first one and store the rest
|
||||||
|
Poll::Ready(Some(real_messages)) => {
|
||||||
|
self.received_buffer = real_messages.into();
|
||||||
|
// we MUST HAVE received at least ONE message
|
||||||
|
Poll::Ready(Some(StreamMessage::Real(Box::new(
|
||||||
|
self.received_buffer.pop_front().unwrap(),
|
||||||
|
))))
|
||||||
|
}
|
||||||
|
|
||||||
|
// if there's nothing, then there's nothing
|
||||||
|
Poll::Pending => Poll::Pending,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_next_message(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
) -> Poll<Option<StreamMessage>> {
|
||||||
|
if self.config.disable_poisson_packet_distribution {
|
||||||
|
self.poll_immediate(cx)
|
||||||
|
} else {
|
||||||
|
self.poll_poisson(cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R> Stream for OutQueueControl<R>
|
||||||
|
where
|
||||||
|
R: CryptoRng + Rng + Unpin,
|
||||||
|
{
|
||||||
|
type Item = StreamMessage;
|
||||||
|
|
||||||
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
|
self.poll_next_message(cx)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -73,6 +73,11 @@ impl ReceivedMessagesBufferInner {
|
|||||||
if self.recently_reconstructed.contains(&fragment.id()) {
|
if self.recently_reconstructed.contains(&fragment.id()) {
|
||||||
debug!("Received a chunk of already re-assembled message ({:?})! It probably got here because the ack got lost", fragment.id());
|
debug!("Received a chunk of already re-assembled message ({:?})! It probably got here because the ack got lost", fragment.id());
|
||||||
return None;
|
return None;
|
||||||
|
} else {
|
||||||
|
debug!(
|
||||||
|
"Received a chunk of message ({:?}) for the first time",
|
||||||
|
fragment.id()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// if we returned an error the underlying message is malformed in some way
|
// if we returned an error the underlying message is malformed in some way
|
||||||
@@ -204,7 +209,7 @@ impl ReceivedMessagesBuffer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_new_received(&mut self, msgs: Vec<Vec<u8>>) {
|
async fn handle_new_received(&mut self, msgs: Vec<Vec<u8>>) {
|
||||||
debug!(
|
trace!(
|
||||||
"Processing {:?} new message that might get added to the buffer!",
|
"Processing {:?} new message that might get added to the buffer!",
|
||||||
msgs.len()
|
msgs.len()
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -14,8 +14,10 @@ pub const MISSING_VALUE: &str = "MISSING VALUE";
|
|||||||
|
|
||||||
// 'DEBUG'
|
// 'DEBUG'
|
||||||
const DEFAULT_ACK_WAIT_MULTIPLIER: f64 = 1.5;
|
const DEFAULT_ACK_WAIT_MULTIPLIER: f64 = 1.5;
|
||||||
|
//const DEFAULT_ACK_WAIT_MULTIPLIER: f64 = 10.0;
|
||||||
|
|
||||||
const DEFAULT_ACK_WAIT_ADDITION: Duration = Duration::from_millis(1_500);
|
//const DEFAULT_ACK_WAIT_ADDITION: Duration = Duration::from_millis(1_500);
|
||||||
|
const DEFAULT_ACK_WAIT_ADDITION: Duration = Duration::from_millis(10_000);
|
||||||
const DEFAULT_LOOP_COVER_STREAM_AVERAGE_DELAY: Duration = Duration::from_millis(200);
|
const DEFAULT_LOOP_COVER_STREAM_AVERAGE_DELAY: Duration = Duration::from_millis(200);
|
||||||
const DEFAULT_MESSAGE_STREAM_AVERAGE_DELAY: Duration = Duration::from_millis(20);
|
const DEFAULT_MESSAGE_STREAM_AVERAGE_DELAY: Duration = Duration::from_millis(20);
|
||||||
const DEFAULT_AVERAGE_PACKET_DELAY: Duration = Duration::from_millis(50);
|
const DEFAULT_AVERAGE_PACKET_DELAY: Duration = Duration::from_millis(50);
|
||||||
@@ -236,6 +238,10 @@ impl<T: NymConfig> Config<T> {
|
|||||||
self.debug.topology_resolution_timeout
|
self.debug.topology_resolution_timeout
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_disabled_main_poisson_packet_distribution(&self) -> bool {
|
||||||
|
self.debug.disable_main_poisson_packet_distribution
|
||||||
|
}
|
||||||
|
|
||||||
pub fn get_version(&self) -> &str {
|
pub fn get_version(&self) -> &str {
|
||||||
&self.client.version
|
&self.client.version
|
||||||
}
|
}
|
||||||
@@ -445,6 +451,10 @@ pub struct Debug {
|
|||||||
/// did not reach its destination.
|
/// did not reach its destination.
|
||||||
#[serde(with = "humantime_serde")]
|
#[serde(with = "humantime_serde")]
|
||||||
topology_resolution_timeout: Duration,
|
topology_resolution_timeout: Duration,
|
||||||
|
|
||||||
|
/// Controls whether the main packet stream constantly produces packets according to the predefined
|
||||||
|
/// poisson distribution.
|
||||||
|
disable_main_poisson_packet_distribution: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Debug {
|
impl Default for Debug {
|
||||||
@@ -459,6 +469,7 @@ impl Default for Debug {
|
|||||||
gateway_response_timeout: DEFAULT_GATEWAY_RESPONSE_TIMEOUT,
|
gateway_response_timeout: DEFAULT_GATEWAY_RESPONSE_TIMEOUT,
|
||||||
topology_refresh_rate: DEFAULT_TOPOLOGY_REFRESH_RATE,
|
topology_refresh_rate: DEFAULT_TOPOLOGY_REFRESH_RATE,
|
||||||
topology_resolution_timeout: DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT,
|
topology_resolution_timeout: DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT,
|
||||||
|
disable_main_poisson_packet_distribution: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -121,6 +121,9 @@ impl NymClient {
|
|||||||
self.config.get_base().get_average_ack_delay(),
|
self.config.get_base().get_average_ack_delay(),
|
||||||
self.config.get_base().get_message_sending_average_delay(),
|
self.config.get_base().get_message_sending_average_delay(),
|
||||||
self.config.get_base().get_average_packet_delay(),
|
self.config.get_base().get_average_packet_delay(),
|
||||||
|
self.config
|
||||||
|
.get_base()
|
||||||
|
.get_disabled_main_poisson_packet_distribution(),
|
||||||
self.as_mix_recipient(),
|
self.as_mix_recipient(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -122,6 +122,9 @@ impl NymClient {
|
|||||||
self.config.get_base().get_average_ack_delay(),
|
self.config.get_base().get_average_ack_delay(),
|
||||||
self.config.get_base().get_message_sending_average_delay(),
|
self.config.get_base().get_message_sending_average_delay(),
|
||||||
self.config.get_base().get_average_packet_delay(),
|
self.config.get_base().get_average_packet_delay(),
|
||||||
|
self.config
|
||||||
|
.get_base()
|
||||||
|
.get_disabled_main_poisson_packet_distribution(),
|
||||||
self.as_mix_recipient(),
|
self.as_mix_recipient(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -52,5 +52,6 @@ fn setup_logging() {
|
|||||||
.filter_module("want", log::LevelFilter::Warn)
|
.filter_module("want", log::LevelFilter::Warn)
|
||||||
.filter_module("tungstenite", log::LevelFilter::Warn)
|
.filter_module("tungstenite", log::LevelFilter::Warn)
|
||||||
.filter_module("tokio_tungstenite", log::LevelFilter::Warn)
|
.filter_module("tokio_tungstenite", log::LevelFilter::Warn)
|
||||||
|
.filter_module("sled", log::LevelFilter::Warn)
|
||||||
.init();
|
.init();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -63,7 +63,7 @@ impl PacketRouter {
|
|||||||
} else if received_packet.len()
|
} else if received_packet.len()
|
||||||
== PacketSize::ExtendedPacket.plaintext_size() - ack_overhead
|
== PacketSize::ExtendedPacket.plaintext_size() - ack_overhead
|
||||||
{
|
{
|
||||||
warn!("received extended packet? Did not expect this...");
|
//warn!("received extended packet? Did not expect this...");
|
||||||
received_messages.push(received_packet);
|
received_messages.push(received_packet);
|
||||||
} else {
|
} else {
|
||||||
// this can happen if other clients are not padding their messages
|
// this can happen if other clients are not padding their messages
|
||||||
|
|||||||
@@ -271,6 +271,8 @@ where
|
|||||||
|
|
||||||
// including set of delays
|
// including set of delays
|
||||||
let delays = delays::generate_from_average_duration(route.len(), self.average_packet_delay);
|
let delays = delays::generate_from_average_duration(route.len(), self.average_packet_delay);
|
||||||
|
//let delays = vec![Delay::new_from_nanos(0); route.len()];
|
||||||
|
//assert!(delays_pre.len() == delays.len());
|
||||||
|
|
||||||
// create the actual sphinx packet here. With valid route and correct payload size,
|
// create the actual sphinx packet here. With valid route and correct payload size,
|
||||||
// there's absolutely no reason for this call to fail.
|
// there's absolutely no reason for this call to fail.
|
||||||
|
|||||||
Reference in New Issue
Block a user