Feature/multi surb transmission lanes (#2723)
* Preserve information about original transmission lanes when buffering reply packets * Attempting to send partial data in 'handle_send_reply' if we don't have enough surbs immediately * Display logging of reply surb request target * promoted reply_controller to a directory * moved channels and messages to separate file * simplifications due to rust 1.66 * Using a shoarthand for obtaining connection_id * made TransmissionBuffer generic * Moved transmissaion buffer to a higher level directory + defined wasm helpers * Using transmission buffer in reply controller * Using the pending replies size in lane lenghts queries * fixed an out of bounds use of fragments * Fixed dropped channel in getting lane queue length * Fixed an edge case failure for reply retransmissions * measuring (and logging) time it takes to obtain lane lenghts * decreased logging level * Removed non-wasm lock on total_size
This commit is contained in:
committed by
GitHub
parent
c4a68dbbe6
commit
32ad93c57e
@@ -3,6 +3,7 @@ name = "client-core"
|
||||
version = "1.1.4"
|
||||
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
|
||||
edition = "2021"
|
||||
rust-version = "1.66"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
|
||||
@@ -371,7 +371,7 @@ where
|
||||
|
||||
// channels responsible for dealing with reply-related fun
|
||||
let (reply_controller_sender, reply_controller_receiver) =
|
||||
reply_controller::new_control_channels();
|
||||
reply_controller::requests::new_control_channels();
|
||||
|
||||
let self_address = self.as_mix_recipient();
|
||||
|
||||
@@ -437,7 +437,7 @@ where
|
||||
input_receiver,
|
||||
sphinx_message_sender.clone(),
|
||||
reply_storage,
|
||||
reply_controller_sender,
|
||||
reply_controller_sender.clone(),
|
||||
reply_controller_receiver,
|
||||
shared_lane_queue_lengths.clone(),
|
||||
client_connection_rx,
|
||||
@@ -471,6 +471,7 @@ where
|
||||
received_buffer_request_sender,
|
||||
},
|
||||
},
|
||||
reply_controller_sender,
|
||||
task_manager,
|
||||
})
|
||||
}
|
||||
@@ -480,5 +481,8 @@ pub struct BaseClient {
|
||||
pub client_input: ClientInputStatus,
|
||||
pub client_output: ClientOutputStatus,
|
||||
|
||||
// it feels very wrong to put this channel here, but I can't think of any other way of passing it to the native client
|
||||
pub reply_controller_sender: ReplyControllerSender,
|
||||
|
||||
pub task_manager: TaskManager,
|
||||
}
|
||||
|
||||
@@ -0,0 +1,14 @@
|
||||
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
mod non_wasm;
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
mod wasm;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub use non_wasm::*;
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
pub use wasm::*;
|
||||
@@ -0,0 +1,13 @@
|
||||
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
pub use tokio::time::*;
|
||||
pub type IntervalStream = tokio_stream::wrappers::IntervalStream;
|
||||
|
||||
pub(crate) fn get_time_now() -> Instant {
|
||||
Instant::now()
|
||||
}
|
||||
|
||||
pub(crate) fn new_interval_stream(polling_rate: Duration) -> IntervalStream {
|
||||
tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(polling_rate))
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use std::time::Duration;
|
||||
use wasm_timer;
|
||||
|
||||
pub use wasm_timer::*;
|
||||
pub type IntervalStream = gloo_timers::future::IntervalStream;
|
||||
|
||||
pub(crate) fn get_time_now() -> Instant {
|
||||
wasm_timer::Instant::now()
|
||||
}
|
||||
|
||||
pub(crate) fn new_interval_stream(polling_rate: Duration) -> IntervalStream {
|
||||
gloo_timers::future::IntervalStream::new(polling_rate.as_millis() as u32)
|
||||
}
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
pub mod base_client;
|
||||
pub mod cover_traffic_stream;
|
||||
pub(crate) mod helpers;
|
||||
pub mod inbound_messages;
|
||||
pub mod key_manager;
|
||||
pub mod mix_traffic;
|
||||
@@ -10,3 +11,4 @@ pub mod real_messages_control;
|
||||
pub mod received_buffer;
|
||||
pub mod replies;
|
||||
pub mod topology_control;
|
||||
pub(crate) mod transmission_buffer;
|
||||
|
||||
@@ -20,6 +20,7 @@ use nymsphinx::params::{PacketSize, DEFAULT_NUM_MIX_HOPS};
|
||||
use nymsphinx::preparer::{MessagePreparer, PreparedFragment};
|
||||
use nymsphinx::Delay;
|
||||
use rand::{CryptoRng, Rng};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use thiserror::Error;
|
||||
@@ -278,7 +279,7 @@ where
|
||||
reply_surb: ReplySurb,
|
||||
amount: u32,
|
||||
) -> Result<(), SurbWrappedPreparationError> {
|
||||
debug!("requesting {amount} reply SURBs from {from:?}");
|
||||
debug!("requesting {amount} reply SURBs from {from}");
|
||||
|
||||
let surbs_request =
|
||||
ReplyMessage::new_surb_request_message(self.config.sender_address, amount);
|
||||
@@ -294,19 +295,11 @@ where
|
||||
)))
|
||||
}
|
||||
|
||||
// the only difference between this method and `try_send_reply_chunks` is that
|
||||
// here we are not creating acks as acks are already in memory waiting to get cleared.
|
||||
// we are only updating their existing delays
|
||||
pub(crate) async fn try_send_retransmission_reply_chunks(
|
||||
pub(crate) async fn send_retransmission_reply_chunks(
|
||||
&mut self,
|
||||
fragments: Vec<Fragment>,
|
||||
reply_surbs: Vec<ReplySurb>,
|
||||
prepared_fragments: Vec<PreparedFragment>,
|
||||
lane: TransmissionLane,
|
||||
) -> Result<(), SurbWrappedPreparationError> {
|
||||
let prepared_fragments = self
|
||||
.prepare_reply_chunks_for_sending(fragments.clone(), reply_surbs)
|
||||
.await?;
|
||||
|
||||
) {
|
||||
let mut real_messages = Vec::with_capacity(prepared_fragments.len());
|
||||
|
||||
for prepared in prepared_fragments {
|
||||
@@ -315,33 +308,58 @@ where
|
||||
}
|
||||
|
||||
self.forward_messages(real_messages, lane).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn try_send_reply_chunks(
|
||||
pub(crate) async fn try_send_reply_chunks_on_lane(
|
||||
&mut self,
|
||||
target: AnonymousSenderTag,
|
||||
fragments: Vec<Fragment>,
|
||||
reply_surbs: Vec<ReplySurb>,
|
||||
lane: TransmissionLane,
|
||||
) -> Result<(), SurbWrappedPreparationError> {
|
||||
// TODO: technically this is performing an unnecessary cloning, but in the grand scheme of things
|
||||
// is it really that bad?
|
||||
self.try_send_reply_chunks(
|
||||
target,
|
||||
fragments.into_iter().map(|f| (lane, f)).collect(),
|
||||
reply_surbs,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn try_send_reply_chunks(
|
||||
&mut self,
|
||||
target: AnonymousSenderTag,
|
||||
fragments: Vec<(TransmissionLane, Fragment)>,
|
||||
reply_surbs: Vec<ReplySurb>,
|
||||
) -> Result<(), SurbWrappedPreparationError> {
|
||||
let prepared_fragments = self
|
||||
.prepare_reply_chunks_for_sending(fragments.clone(), reply_surbs)
|
||||
.prepare_reply_chunks_for_sending(
|
||||
fragments.iter().map(|(_, f)| f.clone()).collect(),
|
||||
reply_surbs,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut pending_acks = Vec::with_capacity(fragments.len());
|
||||
let mut real_messages = Vec::with_capacity(fragments.len());
|
||||
let mut to_forward: HashMap<_, Vec<_>> = HashMap::new();
|
||||
|
||||
for (raw, prepared) in fragments.into_iter().zip(prepared_fragments.into_iter()) {
|
||||
let lane = raw.0;
|
||||
let fragment = raw.1;
|
||||
|
||||
let real_message = RealMessage::new(prepared.mix_packet, prepared.fragment_identifier);
|
||||
let delay = prepared.total_delay;
|
||||
let pending_ack = PendingAcknowledgement::new_anonymous(raw, delay, target, false);
|
||||
let pending_ack = PendingAcknowledgement::new_anonymous(fragment, delay, target, false);
|
||||
|
||||
real_messages.push(real_message);
|
||||
let entry = to_forward.entry(lane).or_default();
|
||||
entry.push(real_message);
|
||||
pending_acks.push(pending_ack);
|
||||
}
|
||||
|
||||
self.forward_messages(real_messages, lane).await;
|
||||
for (lane, real_messages) in to_forward {
|
||||
self.forward_messages(real_messages, lane).await;
|
||||
}
|
||||
|
||||
self.insert_pending_acks(pending_acks);
|
||||
Ok(())
|
||||
}
|
||||
@@ -467,7 +485,7 @@ where
|
||||
Ok(prepared_fragment)
|
||||
}
|
||||
|
||||
async fn prepare_reply_chunks_for_sending(
|
||||
pub(crate) async fn prepare_reply_chunks_for_sending(
|
||||
&mut self,
|
||||
fragments: Vec<Fragment>,
|
||||
reply_surbs: Vec<ReplySurb>,
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use self::sending_delay_controller::SendingDelayController;
|
||||
use crate::client::mix_traffic::BatchMixMessageSender;
|
||||
use crate::client::real_messages_control::acknowledgement_control::SentPacketNotificationSender;
|
||||
use crate::client::topology_control::TopologyAccessor;
|
||||
use crate::client::transmission_buffer::TransmissionBuffer;
|
||||
use client_connections::{
|
||||
ConnectionCommand, ConnectionCommandReceiver, ConnectionId, LaneQueueLengths, TransmissionLane,
|
||||
};
|
||||
@@ -29,22 +31,7 @@ use tokio::time;
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
use wasm_timer;
|
||||
|
||||
use self::{
|
||||
sending_delay_controller::SendingDelayController, transmission_buffer::TransmissionBuffer,
|
||||
};
|
||||
|
||||
mod sending_delay_controller;
|
||||
mod transmission_buffer;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
fn get_time_now() -> time::Instant {
|
||||
time::Instant::now()
|
||||
}
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
fn get_time_now() -> wasm_timer::Instant {
|
||||
wasm_timer::Instant::now()
|
||||
}
|
||||
|
||||
/// Configurable parameters of the `OutQueueControl`
|
||||
pub(crate) struct Config {
|
||||
@@ -135,7 +122,7 @@ where
|
||||
|
||||
/// Buffer containing all incoming real messages keyed by transmission lane, that we will send
|
||||
/// out to the mixnet.
|
||||
transmission_buffer: TransmissionBuffer,
|
||||
transmission_buffer: TransmissionBuffer<RealMessage>,
|
||||
|
||||
/// Incoming channel for being notified of closed connections, so that we can close lanes
|
||||
/// corresponding to connections. To avoid sending traffic unnecessary
|
||||
@@ -162,6 +149,10 @@ impl From<PreparedFragment> for RealMessage {
|
||||
}
|
||||
|
||||
impl RealMessage {
|
||||
pub(crate) fn packet_size(&self) -> usize {
|
||||
self.mix_packet.sphinx_packet().len()
|
||||
}
|
||||
|
||||
pub(crate) fn new(mix_packet: MixPacket, fragment_id: FragmentIdentifier) -> Self {
|
||||
RealMessage {
|
||||
mix_packet,
|
||||
@@ -207,7 +198,7 @@ where
|
||||
real_receiver,
|
||||
rng,
|
||||
topology_access,
|
||||
transmission_buffer: Default::default(),
|
||||
transmission_buffer: TransmissionBuffer::new(),
|
||||
client_connection_rx,
|
||||
lane_queue_lengths,
|
||||
}
|
||||
@@ -328,7 +319,9 @@ where
|
||||
|
||||
fn pop_next_message(&mut self) -> Option<RealMessage> {
|
||||
// Pop the next message from the transmission buffer
|
||||
let (lane, real_next) = self.transmission_buffer.pop_next_message_at_random()?;
|
||||
let (lane, real_next) = self
|
||||
.transmission_buffer
|
||||
.pop_next_message_at_random(&mut self.rng)?;
|
||||
|
||||
// Update the published queue length
|
||||
let lane_length = self.transmission_buffer.lane_length(&lane);
|
||||
|
||||
+3
-16
@@ -1,14 +1,9 @@
|
||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use super::get_time_now;
|
||||
use crate::client::helpers::{get_time_now, Instant};
|
||||
use std::time::Duration;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio::time;
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
use wasm_timer;
|
||||
|
||||
// The minimum time between increasing the average delay between packets. If we hit the ceiling in
|
||||
// the available buffer space we want to take somewhat swift action, but we still need to give a
|
||||
// short time to give the channel a chance reduce pressure.
|
||||
@@ -39,19 +34,11 @@ pub(crate) struct SendingDelayController {
|
||||
lower_bound: u32,
|
||||
|
||||
/// To make sure we don't change the multiplier to fast, we limit a change to some duration
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
time_when_changed: time::Instant,
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
time_when_changed: wasm_timer::Instant,
|
||||
time_when_changed: Instant,
|
||||
|
||||
/// If we have a long enough time without any backpressure detected we try reducing the sending
|
||||
/// delay multiplier
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
time_when_backpressure_detected: time::Instant,
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
time_when_backpressure_detected: wasm_timer::Instant,
|
||||
time_when_backpressure_detected: Instant,
|
||||
}
|
||||
|
||||
impl Default for SendingDelayController {
|
||||
|
||||
+155
-218
@@ -4,8 +4,8 @@
|
||||
use crate::client::real_messages_control::acknowledgement_control::PendingAcknowledgement;
|
||||
use crate::client::real_messages_control::message_handler::{MessageHandler, PreparationError};
|
||||
use crate::client::replies::reply_storage::CombinedReplyStorage;
|
||||
use client_connections::TransmissionLane;
|
||||
use futures::channel::mpsc;
|
||||
use client_connections::{ConnectionId, TransmissionLane};
|
||||
use futures::channel::oneshot;
|
||||
use futures::StreamExt;
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use nymsphinx::addressing::clients::Recipient;
|
||||
@@ -15,116 +15,16 @@ use nymsphinx::chunking::fragment::{Fragment, FragmentIdentifier};
|
||||
use rand::{CryptoRng, Rng};
|
||||
use std::cmp::{max, min};
|
||||
use std::collections::btree_map::Entry;
|
||||
use std::collections::{BTreeMap, HashMap, VecDeque};
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::Duration;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
type IntervalStream = tokio_stream::wrappers::IntervalStream;
|
||||
use crate::client::helpers::new_interval_stream;
|
||||
use crate::client::transmission_buffer::TransmissionBuffer;
|
||||
pub(crate) use requests::{ReplyControllerMessage, ReplyControllerReceiver, ReplyControllerSender};
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
type IntervalStream = gloo_timers::future::IntervalStream;
|
||||
|
||||
pub(crate) fn new_control_channels() -> (ReplyControllerSender, ReplyControllerReceiver) {
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
(tx.into(), rx)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct ReplyControllerSender(mpsc::UnboundedSender<ReplyControllerMessage>);
|
||||
|
||||
impl From<mpsc::UnboundedSender<ReplyControllerMessage>> for ReplyControllerSender {
|
||||
fn from(inner: mpsc::UnboundedSender<ReplyControllerMessage>) -> Self {
|
||||
ReplyControllerSender(inner)
|
||||
}
|
||||
}
|
||||
|
||||
impl ReplyControllerSender {
|
||||
pub(crate) fn send_retransmission_data(
|
||||
&self,
|
||||
recipient: AnonymousSenderTag,
|
||||
timed_out_ack: Weak<PendingAcknowledgement>,
|
||||
extra_surb_request: bool,
|
||||
) {
|
||||
self.0
|
||||
.unbounded_send(ReplyControllerMessage::RetransmitReply {
|
||||
recipient,
|
||||
timed_out_ack,
|
||||
extra_surb_request,
|
||||
})
|
||||
.expect("ReplyControllerReceiver has died!")
|
||||
}
|
||||
|
||||
pub(crate) fn send_reply(
|
||||
&self,
|
||||
recipient: AnonymousSenderTag,
|
||||
message: Vec<u8>,
|
||||
lane: TransmissionLane,
|
||||
) {
|
||||
self.0
|
||||
.unbounded_send(ReplyControllerMessage::SendReply {
|
||||
recipient,
|
||||
message,
|
||||
lane,
|
||||
})
|
||||
.expect("ReplyControllerReceiver has died!")
|
||||
}
|
||||
|
||||
pub(crate) fn send_additional_surbs(
|
||||
&self,
|
||||
sender_tag: AnonymousSenderTag,
|
||||
reply_surbs: Vec<ReplySurb>,
|
||||
from_surb_request: bool,
|
||||
) {
|
||||
self.0
|
||||
.unbounded_send(ReplyControllerMessage::AdditionalSurbs {
|
||||
sender_tag,
|
||||
reply_surbs,
|
||||
from_surb_request,
|
||||
})
|
||||
.expect("ReplyControllerReceiver has died!")
|
||||
}
|
||||
|
||||
pub(crate) fn send_additional_surbs_request(&self, recipient: Recipient, amount: u32) {
|
||||
self.0
|
||||
.unbounded_send(ReplyControllerMessage::AdditionalSurbsRequest {
|
||||
recipient: Box::new(recipient),
|
||||
amount,
|
||||
})
|
||||
.expect("ReplyControllerReceiver has died!")
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) type ReplyControllerReceiver = mpsc::UnboundedReceiver<ReplyControllerMessage>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum ReplyControllerMessage {
|
||||
RetransmitReply {
|
||||
recipient: AnonymousSenderTag,
|
||||
timed_out_ack: Weak<PendingAcknowledgement>,
|
||||
extra_surb_request: bool,
|
||||
},
|
||||
|
||||
SendReply {
|
||||
recipient: AnonymousSenderTag,
|
||||
message: Vec<u8>,
|
||||
lane: TransmissionLane,
|
||||
},
|
||||
|
||||
AdditionalSurbs {
|
||||
sender_tag: AnonymousSenderTag,
|
||||
reply_surbs: Vec<ReplySurb>,
|
||||
from_surb_request: bool,
|
||||
},
|
||||
|
||||
// Should this also be handled in here? it's technically a completely different side of the pipe
|
||||
// let's see how it works when combined, might split it before creating PR
|
||||
AdditionalSurbsRequest {
|
||||
recipient: Box<Recipient>,
|
||||
amount: u32,
|
||||
},
|
||||
}
|
||||
pub mod requests;
|
||||
|
||||
pub struct Config {
|
||||
min_surb_request_size: u32,
|
||||
@@ -172,7 +72,7 @@ pub struct ReplyController<R> {
|
||||
// of surbs required to send the message through
|
||||
// expected_reliability: f32,
|
||||
request_receiver: ReplyControllerReceiver,
|
||||
pending_replies: HashMap<AnonymousSenderTag, VecDeque<Fragment>>,
|
||||
pending_replies: HashMap<AnonymousSenderTag, TransmissionBuffer<Fragment>>,
|
||||
|
||||
/// Retransmission packets that have already timed out and are waiting for additional reply SURBs
|
||||
/// so that they could be sent back to the network. Once we receive more SURBs, we should send them ASAP.
|
||||
@@ -204,17 +104,28 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Inserts the pending replies into the BACK of the queue fn insert_pending_replies<V: Into<VecDeque<Fragment>>>(
|
||||
fn insert_pending_replies<V: Into<VecDeque<Fragment>>>(
|
||||
fn insert_pending_replies<I: IntoIterator<Item = Fragment>>(
|
||||
&mut self,
|
||||
recipient: &AnonymousSenderTag,
|
||||
fragments: V,
|
||||
fragments: I,
|
||||
lane: TransmissionLane,
|
||||
) {
|
||||
if let Some(existing) = self.pending_replies.get_mut(recipient) {
|
||||
existing.append(&mut fragments.into())
|
||||
} else {
|
||||
self.pending_replies.insert(*recipient, fragments.into());
|
||||
}
|
||||
self.pending_replies
|
||||
.entry(*recipient)
|
||||
.or_insert_with(TransmissionBuffer::new)
|
||||
.store(&lane, fragments)
|
||||
}
|
||||
|
||||
fn re_insert_pending_replies(
|
||||
&mut self,
|
||||
recipient: &AnonymousSenderTag,
|
||||
fragments: Vec<(TransmissionLane, Fragment)>,
|
||||
) {
|
||||
// the buffer should ALWAYS exist at this point, if it doesn't, it's a bug...
|
||||
self.pending_replies
|
||||
.entry(*recipient)
|
||||
.or_insert_with(TransmissionBuffer::new)
|
||||
.store_multiple(fragments)
|
||||
}
|
||||
|
||||
fn re_insert_pending_retransmission(
|
||||
@@ -244,7 +155,7 @@ where
|
||||
let pending_queue_size = self
|
||||
.pending_replies
|
||||
.get(target)
|
||||
.map(|pending_queue| pending_queue.len())
|
||||
.map(|pending_queue| pending_queue.total_size())
|
||||
.unwrap_or_default();
|
||||
|
||||
let retransmission_queue = self
|
||||
@@ -299,43 +210,62 @@ where
|
||||
}
|
||||
|
||||
trace!("handling reply to {:?}", recipient_tag);
|
||||
let fragments = self.message_handler.split_reply_message(data);
|
||||
let mut fragments = self.message_handler.split_reply_message(data);
|
||||
let total_size = fragments.len();
|
||||
trace!("This reply requires {:?} SURBs", total_size);
|
||||
|
||||
let required_surbs = fragments.len();
|
||||
trace!("This reply requires {:?} SURBs", required_surbs);
|
||||
|
||||
// TODO: edge case:
|
||||
// we're making a lot of requests and have to request a lot of surbs
|
||||
// (but at some point we run out of surbs for surb requests)
|
||||
|
||||
let (surbs, _surbs_left) = self
|
||||
let available_surbs = self
|
||||
.full_reply_storage
|
||||
.surbs_storage_ref()
|
||||
.get_reply_surbs(&recipient_tag, required_surbs);
|
||||
.available_surbs(&recipient_tag);
|
||||
let min_surbs_threshold = self
|
||||
.full_reply_storage
|
||||
.surbs_storage_ref()
|
||||
.min_surb_threshold();
|
||||
|
||||
if let Some(reply_surbs) = surbs {
|
||||
if let Err(err) = self
|
||||
.message_handler
|
||||
.try_send_reply_chunks(recipient_tag, fragments, reply_surbs, lane)
|
||||
.await
|
||||
{
|
||||
let err = err.return_unused_surbs(
|
||||
self.full_reply_storage.surbs_storage_ref(),
|
||||
&recipient_tag,
|
||||
);
|
||||
warn!("failed to send reply to {:?} - {err}", recipient_tag);
|
||||
|
||||
// TODO: should we buffer that data to try again?
|
||||
}
|
||||
let max_to_send = if available_surbs > min_surbs_threshold {
|
||||
min(fragments.len(), available_surbs - min_surbs_threshold)
|
||||
} else {
|
||||
// we don't have enough surbs for this reply
|
||||
self.insert_pending_replies(&recipient_tag, fragments);
|
||||
0
|
||||
};
|
||||
|
||||
if self.should_request_more_surbs(&recipient_tag) {
|
||||
self.request_reply_surbs_for_queue_clearing(recipient_tag)
|
||||
.await;
|
||||
if max_to_send > 0 {
|
||||
let (surbs, _surbs_left) = self
|
||||
.full_reply_storage
|
||||
.surbs_storage_ref()
|
||||
.get_reply_surbs(&recipient_tag, max_to_send);
|
||||
|
||||
if let Some(reply_surbs) = surbs {
|
||||
let to_send = fragments.drain(..max_to_send).collect::<Vec<_>>();
|
||||
if let Err(err) = self
|
||||
.message_handler
|
||||
.try_send_reply_chunks_on_lane(
|
||||
recipient_tag,
|
||||
to_send.clone(),
|
||||
reply_surbs,
|
||||
lane,
|
||||
)
|
||||
.await
|
||||
{
|
||||
let err = err.return_unused_surbs(
|
||||
self.full_reply_storage.surbs_storage_ref(),
|
||||
&recipient_tag,
|
||||
);
|
||||
warn!("failed to send reply to {recipient_tag}: {err}");
|
||||
self.insert_pending_replies(&recipient_tag, to_send, lane);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if there's leftover data we didn't send because we didn't have enough (or any) surbs - buffer it
|
||||
if !fragments.is_empty() {
|
||||
self.insert_pending_replies(&recipient_tag, fragments, lane);
|
||||
}
|
||||
|
||||
if self.should_request_more_surbs(&recipient_tag) {
|
||||
self.request_reply_surbs_for_queue_clearing(recipient_tag)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn request_additional_reply_surbs(
|
||||
@@ -398,35 +328,18 @@ where
|
||||
};
|
||||
|
||||
let mut to_take = Vec::new();
|
||||
let mut to_remove = Vec::new();
|
||||
|
||||
// TODO: once rust 1.66.0 is stabilised on 15.12.22, just change it to
|
||||
// `.pop_front()` to directly take ownership
|
||||
for (k, data) in pending.iter() {
|
||||
let upgraded = match data.upgrade() {
|
||||
Some(upgraded) => upgraded,
|
||||
None => {
|
||||
// we got the ack while the data was waiting in the queue
|
||||
to_remove.push(*k);
|
||||
continue;
|
||||
while to_take.len() < max_to_clear {
|
||||
if let Some((_, data)) = pending.pop_first() {
|
||||
// no need to do anything if we failed to upgrade the reference,
|
||||
// it means we got the ack while the data was waiting in the queue
|
||||
if let Some(upgraded) = data.upgrade() {
|
||||
to_take.push(upgraded)
|
||||
}
|
||||
};
|
||||
|
||||
to_take.push(upgraded);
|
||||
|
||||
// we have taken as many entries as we could have
|
||||
if to_take.len() >= max_to_clear {
|
||||
} else {
|
||||
// our map is empty!
|
||||
break;
|
||||
}
|
||||
// TODO: use if upgraded.is_extra_surb_request() to bypass the limit
|
||||
}
|
||||
|
||||
for ack in &to_take {
|
||||
pending.remove(&ack.inner_fragment_identifier());
|
||||
}
|
||||
|
||||
for id in to_remove {
|
||||
pending.remove(&id);
|
||||
}
|
||||
|
||||
if to_take.is_empty() {
|
||||
@@ -447,46 +360,47 @@ where
|
||||
|
||||
let to_send_vec = to_take.iter().map(|ack| ack.fragment_data()).collect();
|
||||
|
||||
if let Err(err) = self
|
||||
let prepared_fragments = match self
|
||||
.message_handler
|
||||
.try_send_retransmission_reply_chunks(
|
||||
to_send_vec,
|
||||
surbs_for_reply,
|
||||
TransmissionLane::Retransmission,
|
||||
)
|
||||
.prepare_reply_chunks_for_sending(to_send_vec, surbs_for_reply)
|
||||
.await
|
||||
{
|
||||
let err = err.return_unused_surbs(self.full_reply_storage.surbs_storage_ref(), &target);
|
||||
self.re_insert_pending_retransmission(&target, to_take);
|
||||
Ok(prepared) => prepared,
|
||||
Err(err) => {
|
||||
let err =
|
||||
err.return_unused_surbs(self.full_reply_storage.surbs_storage_ref(), &target);
|
||||
self.re_insert_pending_retransmission(&target, to_take);
|
||||
|
||||
warn!(
|
||||
"failed to clear pending retransmission queue for {:?} - {err}",
|
||||
target
|
||||
);
|
||||
}
|
||||
warn!(
|
||||
"failed to clear pending retransmission queue for {:?} - {err}",
|
||||
target
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// we can't fail at this point, so drop all references to acks so that timer updates wouldn't blow up
|
||||
drop(to_take);
|
||||
|
||||
self.message_handler
|
||||
.send_retransmission_reply_chunks(prepared_fragments, TransmissionLane::Retransmission)
|
||||
.await;
|
||||
}
|
||||
|
||||
fn pop_at_most_pending_replies(
|
||||
&mut self,
|
||||
from: &AnonymousSenderTag,
|
||||
amount: usize,
|
||||
) -> Option<VecDeque<Fragment>> {
|
||||
) -> Option<Vec<(TransmissionLane, Fragment)>> {
|
||||
// if possible, pop all pending replies, if not, pop only entries for which we'd have a reply surb
|
||||
let total = self.pending_replies.get(from)?.len();
|
||||
let total = self.pending_replies.get(from)?.total_size();
|
||||
trace!("pending queue has {total} elements");
|
||||
if total == 0 {
|
||||
return None;
|
||||
}
|
||||
if total < amount {
|
||||
self.pending_replies.remove(from)
|
||||
} else {
|
||||
Some(
|
||||
self.pending_replies
|
||||
.get_mut(from)?
|
||||
.drain(..amount)
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
self.pending_replies
|
||||
.get_mut(from)?
|
||||
.pop_at_most_n_next_messages_at_random(amount)
|
||||
}
|
||||
|
||||
async fn try_clear_pending_queue(&mut self, target: AnonymousSenderTag) {
|
||||
@@ -510,9 +424,9 @@ where
|
||||
|
||||
// we're guaranteed to not get more entries than we have reply surbs for
|
||||
if let Some(to_send) = self.pop_at_most_pending_replies(&target, max_to_clear) {
|
||||
let to_send_vec = to_send.iter().cloned().collect::<Vec<_>>();
|
||||
let to_send_clone = to_send.clone();
|
||||
|
||||
if to_send_vec.is_empty() {
|
||||
if to_send_clone.is_empty() {
|
||||
panic!(
|
||||
"please let the devs know if you ever see this message (reply_controller.rs)"
|
||||
);
|
||||
@@ -521,27 +435,22 @@ where
|
||||
let (surbs_for_reply, _) = self
|
||||
.full_reply_storage
|
||||
.surbs_storage_ref()
|
||||
.get_reply_surbs(&target, to_send_vec.len());
|
||||
.get_reply_surbs(&target, to_send_clone.len());
|
||||
|
||||
let Some(surbs_for_reply) = surbs_for_reply else {
|
||||
error!("somehow different task has stolen our reply surbs! - this should have been impossible");
|
||||
self.insert_pending_replies(&target, to_send);
|
||||
self.re_insert_pending_replies(&target, to_send);
|
||||
return;
|
||||
};
|
||||
|
||||
if let Err(err) = self
|
||||
.message_handler
|
||||
.try_send_reply_chunks(
|
||||
target,
|
||||
to_send_vec,
|
||||
surbs_for_reply,
|
||||
TransmissionLane::General,
|
||||
)
|
||||
.try_send_reply_chunks(target, to_send_clone, surbs_for_reply)
|
||||
.await
|
||||
{
|
||||
let err =
|
||||
err.return_unused_surbs(self.full_reply_storage.surbs_storage_ref(), &target);
|
||||
self.insert_pending_replies(&target, to_send);
|
||||
self.re_insert_pending_replies(&target, to_send);
|
||||
warn!("failed to clear pending queue for {:?} - {err}", target);
|
||||
}
|
||||
} else {
|
||||
@@ -712,6 +621,30 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// to be honest this doesn't make a lot of sense in the context of `connection_id`,
|
||||
// it should really be asked per tag
|
||||
fn handle_lane_queue_length(
|
||||
&self,
|
||||
connection_id: ConnectionId,
|
||||
response_channel: oneshot::Sender<usize>,
|
||||
) {
|
||||
// TODO: if we ever have duplicate ids for different senders, it means our rng is super weak
|
||||
// thus I don't think we have to worry about it?
|
||||
let lane = TransmissionLane::ConnectionId(connection_id);
|
||||
for buf in self.pending_replies.values() {
|
||||
if let Some(length) = buf.lane_length(&lane) {
|
||||
if response_channel.send(length).is_err() {
|
||||
error!("the requester for lane queue length has dropped the response channel!")
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
// make sure that if we didn't find that lane, we reply with 0
|
||||
if response_channel.send(0).is_err() {
|
||||
error!("the requester for lane queue length has dropped the response channel!")
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_request(&mut self, request: ReplyControllerMessage) {
|
||||
match request {
|
||||
ReplyControllerMessage::RetransmitReply {
|
||||
@@ -735,19 +668,26 @@ where
|
||||
self.handle_received_surbs(sender_tag, reply_surbs, from_surb_request)
|
||||
.await
|
||||
}
|
||||
ReplyControllerMessage::LaneQueueLength {
|
||||
connection_id,
|
||||
response_channel,
|
||||
} => self.handle_lane_queue_length(connection_id, response_channel),
|
||||
ReplyControllerMessage::AdditionalSurbsRequest { recipient, amount } => {
|
||||
self.handle_surb_request(*recipient, amount).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: modify this method to more accurately determine the amount of surbs it needs to request
|
||||
// it should take into consideration the average latency, sending rate and queue size.
|
||||
// it should request as many surbs as it takes to saturate its sending rate before next batch arrives
|
||||
async fn request_reply_surbs_for_queue_clearing(&mut self, target: AnonymousSenderTag) {
|
||||
trace!("requesting surbs for queues clearing");
|
||||
|
||||
let pending_queue_size = self
|
||||
.pending_replies
|
||||
.get(&target)
|
||||
.map(|pending_queue| pending_queue.len())
|
||||
.map(|pending_queue| pending_queue.total_size())
|
||||
.unwrap_or_default();
|
||||
|
||||
let retransmission_queue = self
|
||||
@@ -787,7 +727,7 @@ where
|
||||
}
|
||||
|
||||
let Some(last_received) = self.full_reply_storage.surbs_storage_ref().surbs_last_received_at(pending_reply_target) else {
|
||||
error!("we have {} pending replies for {pending_reply_target}, but we somehow never received any reply surbs from them!", vals.len());
|
||||
error!("we have {} pending replies for {pending_reply_target}, but we somehow never received any reply surbs from them!", vals.total_size());
|
||||
to_remove.push(*pending_reply_target);
|
||||
continue;
|
||||
};
|
||||
@@ -883,23 +823,20 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn create_interval_stream(polling_rate: Duration) -> IntervalStream {
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
return tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(polling_rate));
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
return gloo_timers::future::IntervalStream::new(polling_rate.as_millis() as u32);
|
||||
}
|
||||
// #[cfg(not(target_arch = "wasm32"))]
|
||||
// async fn log_status(&self) {
|
||||
// todo!()
|
||||
// }
|
||||
|
||||
pub(crate) async fn run_with_shutdown(&mut self, mut shutdown: task::TaskClient) {
|
||||
debug!("Started ReplyController with graceful shutdown support");
|
||||
|
||||
let polling_rate = Duration::from_secs(5);
|
||||
let mut stale_inspection = Self::create_interval_stream(polling_rate);
|
||||
let mut stale_inspection = new_interval_stream(polling_rate);
|
||||
|
||||
// this is in the order of hours/days so we don't have to poll it that often
|
||||
let polling_rate = Duration::from_secs(self.config.max_reply_surb_age.as_secs() / 10);
|
||||
let mut invalidation_inspection = Self::create_interval_stream(polling_rate);
|
||||
let mut invalidation_inspection = new_interval_stream(polling_rate);
|
||||
|
||||
while !shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
@@ -0,0 +1,136 @@
|
||||
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::client::real_messages_control::acknowledgement_control::PendingAcknowledgement;
|
||||
use client_connections::{ConnectionId, TransmissionLane};
|
||||
use futures::channel::{mpsc, oneshot};
|
||||
use log::error;
|
||||
use nymsphinx::addressing::clients::Recipient;
|
||||
use nymsphinx::anonymous_replies::requests::AnonymousSenderTag;
|
||||
use nymsphinx::anonymous_replies::ReplySurb;
|
||||
use std::sync::Weak;
|
||||
|
||||
pub(crate) fn new_control_channels() -> (ReplyControllerSender, ReplyControllerReceiver) {
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
(tx.into(), rx)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ReplyControllerSender(mpsc::UnboundedSender<ReplyControllerMessage>);
|
||||
|
||||
impl From<mpsc::UnboundedSender<ReplyControllerMessage>> for ReplyControllerSender {
|
||||
fn from(inner: mpsc::UnboundedSender<ReplyControllerMessage>) -> Self {
|
||||
ReplyControllerSender(inner)
|
||||
}
|
||||
}
|
||||
|
||||
impl ReplyControllerSender {
|
||||
pub(crate) fn send_retransmission_data(
|
||||
&self,
|
||||
recipient: AnonymousSenderTag,
|
||||
timed_out_ack: Weak<PendingAcknowledgement>,
|
||||
extra_surb_request: bool,
|
||||
) {
|
||||
self.0
|
||||
.unbounded_send(ReplyControllerMessage::RetransmitReply {
|
||||
recipient,
|
||||
timed_out_ack,
|
||||
extra_surb_request,
|
||||
})
|
||||
.expect("ReplyControllerReceiver has died!")
|
||||
}
|
||||
|
||||
pub(crate) fn send_reply(
|
||||
&self,
|
||||
recipient: AnonymousSenderTag,
|
||||
message: Vec<u8>,
|
||||
lane: TransmissionLane,
|
||||
) {
|
||||
self.0
|
||||
.unbounded_send(ReplyControllerMessage::SendReply {
|
||||
recipient,
|
||||
message,
|
||||
lane,
|
||||
})
|
||||
.expect("ReplyControllerReceiver has died!")
|
||||
}
|
||||
|
||||
pub(crate) fn send_additional_surbs(
|
||||
&self,
|
||||
sender_tag: AnonymousSenderTag,
|
||||
reply_surbs: Vec<ReplySurb>,
|
||||
from_surb_request: bool,
|
||||
) {
|
||||
self.0
|
||||
.unbounded_send(ReplyControllerMessage::AdditionalSurbs {
|
||||
sender_tag,
|
||||
reply_surbs,
|
||||
from_surb_request,
|
||||
})
|
||||
.expect("ReplyControllerReceiver has died!")
|
||||
}
|
||||
|
||||
pub(crate) fn send_additional_surbs_request(&self, recipient: Recipient, amount: u32) {
|
||||
self.0
|
||||
.unbounded_send(ReplyControllerMessage::AdditionalSurbsRequest {
|
||||
recipient: Box::new(recipient),
|
||||
amount,
|
||||
})
|
||||
.expect("ReplyControllerReceiver has died!")
|
||||
}
|
||||
|
||||
pub async fn get_lane_queue_length(&self, connection_id: ConnectionId) -> usize {
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
self.0
|
||||
.unbounded_send(ReplyControllerMessage::LaneQueueLength {
|
||||
connection_id,
|
||||
response_channel: response_tx,
|
||||
})
|
||||
.expect("ReplyControllerReceiver has died!");
|
||||
|
||||
match response_rx.await {
|
||||
Ok(length) => length,
|
||||
Err(_) => {
|
||||
error!("The reply controller has dropped our response channel!");
|
||||
// TODO: should we panic here instead? this message implies something weird and unrecoverable has happened
|
||||
0
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) type ReplyControllerReceiver = mpsc::UnboundedReceiver<ReplyControllerMessage>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum ReplyControllerMessage {
|
||||
RetransmitReply {
|
||||
recipient: AnonymousSenderTag,
|
||||
timed_out_ack: Weak<PendingAcknowledgement>,
|
||||
extra_surb_request: bool,
|
||||
},
|
||||
|
||||
SendReply {
|
||||
recipient: AnonymousSenderTag,
|
||||
message: Vec<u8>,
|
||||
lane: TransmissionLane,
|
||||
},
|
||||
|
||||
AdditionalSurbs {
|
||||
sender_tag: AnonymousSenderTag,
|
||||
reply_surbs: Vec<ReplySurb>,
|
||||
from_surb_request: bool,
|
||||
},
|
||||
|
||||
// this one doesn't belong here either...
|
||||
LaneQueueLength {
|
||||
connection_id: ConnectionId,
|
||||
response_channel: oneshot::Sender<usize>,
|
||||
},
|
||||
|
||||
// Should this also be handled in here? it's technically a completely different side of the pipe
|
||||
// let's see how it works when combined, might split it before creating PR
|
||||
AdditionalSurbsRequest {
|
||||
recipient: Box<Recipient>,
|
||||
amount: u32,
|
||||
},
|
||||
}
|
||||
+118
-48
@@ -1,38 +1,57 @@
|
||||
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::client::helpers::{get_time_now, Instant};
|
||||
use crate::client::real_messages_control::real_traffic_stream::RealMessage;
|
||||
use client_connections::TransmissionLane;
|
||||
use nymsphinx::chunking::fragment::Fragment;
|
||||
use rand::{seq::SliceRandom, Rng};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet, VecDeque},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio::time;
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
use wasm_timer;
|
||||
|
||||
use super::{get_time_now, RealMessage};
|
||||
|
||||
// The number of lanes included in the oldest set. Used when we need to prioritize traffic.
|
||||
const OLDEST_LANE_SET_SIZE: usize = 4;
|
||||
// As a way of prune connections we also check for timeouts.
|
||||
const MSG_CONSIDERED_STALE_AFTER_SECS: u64 = 10 * 60;
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct TransmissionBuffer {
|
||||
buffer: HashMap<TransmissionLane, LaneBufferEntry>,
|
||||
pub(crate) trait SizedData {
|
||||
fn data_size(&self) -> usize;
|
||||
}
|
||||
|
||||
impl TransmissionBuffer {
|
||||
impl SizedData for RealMessage {
|
||||
fn data_size(&self) -> usize {
|
||||
self.packet_size()
|
||||
}
|
||||
}
|
||||
|
||||
impl SizedData for Fragment {
|
||||
fn data_size(&self) -> usize {
|
||||
// note that raw `Fragment` is smaller than sphinx packet payload
|
||||
// as it doesn't include surb-ack or the [shared] key materials
|
||||
self.payload_size()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct TransmissionBuffer<T> {
|
||||
buffer: HashMap<TransmissionLane, LaneBufferEntry<T>>,
|
||||
}
|
||||
|
||||
impl<T> TransmissionBuffer<T> {
|
||||
pub(crate) fn new() -> Self {
|
||||
TransmissionBuffer {
|
||||
buffer: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) fn is_empty(&self) -> bool {
|
||||
self.buffer.is_empty()
|
||||
}
|
||||
|
||||
pub(crate) fn remove(&mut self, lane: &TransmissionLane) -> Option<LaneBufferEntry> {
|
||||
pub(crate) fn remove(&mut self, lane: &TransmissionLane) -> Option<LaneBufferEntry<T>> {
|
||||
self.buffer.remove(lane)
|
||||
}
|
||||
|
||||
@@ -57,20 +76,22 @@ impl TransmissionBuffer {
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub(crate) fn total_size(&self) -> usize {
|
||||
self.buffer.values().map(LaneBufferEntry::len).sum()
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub(crate) fn total_size_in_bytes(&self) -> usize {
|
||||
pub(crate) fn total_size_in_bytes(&self) -> usize
|
||||
where
|
||||
T: SizedData,
|
||||
{
|
||||
self.buffer
|
||||
.values()
|
||||
.map(|lane_buffer_entry| {
|
||||
lane_buffer_entry
|
||||
.real_messages
|
||||
.items
|
||||
.iter()
|
||||
.map(|real_message| real_message.mix_packet.sphinx_packet().len())
|
||||
.map(|item| item.data_size())
|
||||
.sum::<usize>()
|
||||
})
|
||||
.sum()
|
||||
@@ -92,42 +113,51 @@ impl TransmissionBuffer {
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub(crate) fn store(&mut self, lane: &TransmissionLane, real_messages: Vec<RealMessage>) {
|
||||
pub(crate) fn store<I: IntoIterator<Item = T>>(&mut self, lane: &TransmissionLane, items: I) {
|
||||
if let Some(lane_buffer_entry) = self.buffer.get_mut(lane) {
|
||||
lane_buffer_entry.append(real_messages);
|
||||
lane_buffer_entry.extend(items);
|
||||
} else {
|
||||
self.buffer
|
||||
.insert(*lane, LaneBufferEntry::new(real_messages));
|
||||
.insert(*lane, LaneBufferEntry::new(items.into_iter().collect()));
|
||||
}
|
||||
}
|
||||
|
||||
fn pick_random_lane(&self) -> Option<&TransmissionLane> {
|
||||
let lanes: Vec<&TransmissionLane> = self.buffer.keys().collect();
|
||||
lanes.choose(&mut rand::thread_rng()).copied()
|
||||
pub(crate) fn store_multiple(&mut self, items: Vec<(TransmissionLane, T)>) {
|
||||
for (lane, item) in items {
|
||||
self.buffer
|
||||
.entry(lane)
|
||||
.or_insert_with(LaneBufferEntry::new_empty)
|
||||
.push_item(item)
|
||||
}
|
||||
}
|
||||
|
||||
fn pick_random_small_lane(&self) -> Option<&TransmissionLane> {
|
||||
fn pick_random_lane<R: Rng + ?Sized>(&self, rng: &mut R) -> Option<&TransmissionLane> {
|
||||
let lanes: Vec<&TransmissionLane> = self.buffer.keys().collect();
|
||||
lanes.choose(rng).copied()
|
||||
}
|
||||
|
||||
fn pick_random_small_lane<R: Rng + ?Sized>(&self, rng: &mut R) -> Option<&TransmissionLane> {
|
||||
let lanes: Vec<&TransmissionLane> = self
|
||||
.buffer
|
||||
.iter()
|
||||
.filter(|(_, v)| v.is_small())
|
||||
.map(|(k, _)| k)
|
||||
.collect();
|
||||
lanes.choose(&mut rand::thread_rng()).copied()
|
||||
lanes.choose(rng).copied()
|
||||
}
|
||||
|
||||
// 2/3 chance to pick from the old lanes
|
||||
fn pick_random_old_lane(&self) -> Option<TransmissionLane> {
|
||||
fn pick_random_old_lane<R: Rng + ?Sized>(&self, rng: &mut R) -> Option<TransmissionLane> {
|
||||
let rand = &mut rand::thread_rng();
|
||||
if rand.gen_ratio(2, 3) {
|
||||
let lanes = self.get_oldest_set();
|
||||
lanes.choose(rand).copied()
|
||||
} else {
|
||||
self.pick_random_lane().copied()
|
||||
self.pick_random_lane(rng).copied()
|
||||
}
|
||||
}
|
||||
|
||||
fn pop_front_from_lane(&mut self, lane: &TransmissionLane) -> Option<RealMessage> {
|
||||
fn pop_front_from_lane(&mut self, lane: &TransmissionLane) -> Option<T> {
|
||||
let real_msgs_queued = self.buffer.get_mut(lane)?;
|
||||
let real_next = real_msgs_queued.pop_front()?;
|
||||
real_msgs_queued.messages_transmitted += 1;
|
||||
@@ -137,19 +167,48 @@ impl TransmissionBuffer {
|
||||
Some(real_next)
|
||||
}
|
||||
|
||||
pub(crate) fn pop_next_message_at_random(&mut self) -> Option<(TransmissionLane, RealMessage)> {
|
||||
pub(crate) fn pop_at_most_n_next_messages_at_random(
|
||||
&mut self,
|
||||
n: usize,
|
||||
) -> Option<Vec<(TransmissionLane, T)>> {
|
||||
// let start = Instant::now();
|
||||
|
||||
if self.buffer.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let rng = &mut rand::thread_rng();
|
||||
let mut items = Vec::with_capacity(n);
|
||||
|
||||
while items.len() < n {
|
||||
let Some(next) = self.pop_next_message_at_random(rng) else {
|
||||
break
|
||||
};
|
||||
items.push(next)
|
||||
}
|
||||
|
||||
// todo!("time time taken");
|
||||
|
||||
Some(items)
|
||||
}
|
||||
|
||||
pub(crate) fn pop_next_message_at_random<R: Rng + ?Sized>(
|
||||
&mut self,
|
||||
// turns out the caller always have access to some rng, so no point in instantiating new one
|
||||
rng: &mut R,
|
||||
) -> Option<(TransmissionLane, T)> {
|
||||
if self.buffer.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Very basic heuristic where we prioritize according to small lanes first, the older lanes
|
||||
// to try to finish lanes when possible, then the rest.
|
||||
let lane = if let Some(small_lane) = self.pick_random_small_lane() {
|
||||
let lane = if let Some(small_lane) = self.pick_random_small_lane(rng) {
|
||||
*small_lane
|
||||
} else if let Some(old_lane) = self.pick_random_old_lane() {
|
||||
} else if let Some(old_lane) = self.pick_random_old_lane(rng) {
|
||||
old_lane
|
||||
} else {
|
||||
*self.pick_random_lane()?
|
||||
*self.pick_random_lane(rng)?
|
||||
};
|
||||
|
||||
let msg = self.pop_front_from_lane(&lane)?;
|
||||
@@ -171,35 +230,46 @@ impl TransmissionBuffer {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct LaneBufferEntry {
|
||||
pub real_messages: VecDeque<RealMessage>,
|
||||
pub(crate) struct LaneBufferEntry<T> {
|
||||
pub items: VecDeque<T>,
|
||||
pub messages_transmitted: usize,
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub time_for_last_activity: time::Instant,
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
pub time_for_last_activity: wasm_timer::Instant,
|
||||
pub time_for_last_activity: Instant,
|
||||
}
|
||||
|
||||
impl LaneBufferEntry {
|
||||
fn new(real_messages: Vec<RealMessage>) -> Self {
|
||||
impl<T> LaneBufferEntry<T> {
|
||||
fn new_empty() -> Self {
|
||||
LaneBufferEntry {
|
||||
real_messages: real_messages.into(),
|
||||
items: VecDeque::new(),
|
||||
messages_transmitted: 0,
|
||||
time_for_last_activity: get_time_now(),
|
||||
}
|
||||
}
|
||||
|
||||
fn append(&mut self, real_messages: Vec<RealMessage>) {
|
||||
self.real_messages.append(&mut real_messages.into());
|
||||
fn new(items: VecDeque<T>) -> Self {
|
||||
LaneBufferEntry {
|
||||
items,
|
||||
messages_transmitted: 0,
|
||||
time_for_last_activity: get_time_now(),
|
||||
}
|
||||
}
|
||||
|
||||
fn push_item(&mut self, item: T) {
|
||||
self.items.push_back(item);
|
||||
// I'm not updating time here on purpose. This method is called just after `new_empty`,
|
||||
// where the time is already set. Furthermore, this method is called there multiple times at once
|
||||
}
|
||||
|
||||
fn extend<I: IntoIterator<Item = T>>(&mut self, items: I) {
|
||||
self.items.extend(items);
|
||||
self.time_for_last_activity = get_time_now();
|
||||
}
|
||||
|
||||
fn pop_front(&mut self) -> Option<RealMessage> {
|
||||
self.real_messages.pop_front()
|
||||
fn pop_front(&mut self) -> Option<T> {
|
||||
self.items.pop_front()
|
||||
}
|
||||
|
||||
fn is_small(&self) -> bool {
|
||||
self.real_messages.len() < 100
|
||||
self.items.len() < 100
|
||||
}
|
||||
|
||||
fn is_stale(&self) -> bool {
|
||||
@@ -208,10 +278,10 @@ impl LaneBufferEntry {
|
||||
}
|
||||
|
||||
fn len(&self) -> usize {
|
||||
self.real_messages.len()
|
||||
self.items.len()
|
||||
}
|
||||
|
||||
fn is_empty(&self) -> bool {
|
||||
self.real_messages.is_empty()
|
||||
self.items.is_empty()
|
||||
}
|
||||
}
|
||||
@@ -13,6 +13,7 @@ use client_core::client::base_client::{
|
||||
use client_core::client::inbound_messages::InputMessage;
|
||||
use client_core::client::key_manager::KeyManager;
|
||||
use client_core::client::received_buffer::{ReceivedBufferMessage, ReconstructedMessagesReceiver};
|
||||
use client_core::client::replies::reply_controller::requests::ReplyControllerSender;
|
||||
use client_core::config::persistence::key_pathfinder::ClientKeyPathfinder;
|
||||
use futures::channel::mpsc;
|
||||
use gateway_client::bandwidth::BandwidthController;
|
||||
@@ -87,6 +88,7 @@ impl SocketClient {
|
||||
client_input: ClientInput,
|
||||
client_output: ClientOutput,
|
||||
self_address: &Recipient,
|
||||
reply_controller_sender: ReplyControllerSender,
|
||||
shutdown: task::TaskClient,
|
||||
) {
|
||||
info!("Starting websocket listener...");
|
||||
@@ -107,6 +109,7 @@ impl SocketClient {
|
||||
received_buffer_request_sender,
|
||||
self_address,
|
||||
shared_lane_queue_lengths,
|
||||
reply_controller_sender,
|
||||
);
|
||||
|
||||
websocket::Listener::new(config.get_listening_port()).start(websocket_handler, shutdown);
|
||||
@@ -154,6 +157,7 @@ impl SocketClient {
|
||||
client_input,
|
||||
client_output,
|
||||
&self_address,
|
||||
started_client.reply_controller_sender,
|
||||
started_client.task_manager.subscribe(),
|
||||
);
|
||||
|
||||
|
||||
@@ -2,8 +2,9 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use client_connections::{
|
||||
ConnectionCommand, ConnectionCommandSender, LaneQueueLengths, TransmissionLane,
|
||||
ConnectionCommand, ConnectionCommandSender, ConnectionId, LaneQueueLengths, TransmissionLane,
|
||||
};
|
||||
use client_core::client::replies::reply_controller::requests::ReplyControllerSender;
|
||||
use client_core::client::{
|
||||
inbound_messages::{InputMessage, InputMessageSender},
|
||||
received_buffer::{
|
||||
@@ -16,7 +17,9 @@ use log::*;
|
||||
use nymsphinx::addressing::clients::Recipient;
|
||||
use nymsphinx::anonymous_replies::requests::AnonymousSenderTag;
|
||||
use nymsphinx::receiver::ReconstructedMessage;
|
||||
use std::time::Duration;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time::Instant;
|
||||
use tokio_tungstenite::{
|
||||
accept_async,
|
||||
tungstenite::{protocol::Message as WsMessage, Error as WsError},
|
||||
@@ -41,6 +44,7 @@ pub(crate) struct HandlerBuilder {
|
||||
buffer_requester: ReceivedBufferRequestSender,
|
||||
self_full_address: Recipient,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
reply_controller_sender: ReplyControllerSender,
|
||||
}
|
||||
|
||||
impl HandlerBuilder {
|
||||
@@ -50,6 +54,7 @@ impl HandlerBuilder {
|
||||
buffer_requester: ReceivedBufferRequestSender,
|
||||
self_full_address: &Recipient,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
reply_controller_sender: ReplyControllerSender,
|
||||
) -> Self {
|
||||
Self {
|
||||
msg_input,
|
||||
@@ -57,6 +62,7 @@ impl HandlerBuilder {
|
||||
buffer_requester,
|
||||
self_full_address: *self_full_address,
|
||||
lane_queue_lengths,
|
||||
reply_controller_sender,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,6 +76,7 @@ impl HandlerBuilder {
|
||||
socket: None,
|
||||
received_response_type: Default::default(),
|
||||
lane_queue_lengths: self.lane_queue_lengths.clone(),
|
||||
reply_controller_sender: self.reply_controller_sender.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -82,6 +89,7 @@ pub(crate) struct Handler {
|
||||
socket: Option<WebSocketStream<TcpStream>>,
|
||||
received_response_type: ReceivedResponseType,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
reply_controller_sender: ReplyControllerSender,
|
||||
}
|
||||
|
||||
impl Drop for Handler {
|
||||
@@ -97,6 +105,48 @@ impl Drop for Handler {
|
||||
}
|
||||
|
||||
impl Handler {
|
||||
async fn get_lane_queue_length(&self, connection_id: ConnectionId) -> Option<ServerResponse> {
|
||||
let req_start = Instant::now();
|
||||
|
||||
// get the base queue length
|
||||
// Note that this does _NOT_ take into account the packets that have been received but not
|
||||
// yet reach `OutQueueControl`, so it might be a tad low.
|
||||
let conn_lane = TransmissionLane::ConnectionId(connection_id);
|
||||
let Ok(base_length) = self
|
||||
.lane_queue_lengths
|
||||
.lock()
|
||||
.map(|guard| guard.get(&conn_lane).unwrap_or_default()) else {
|
||||
// I'd argue we should panic here as this error it not recoverable
|
||||
error!("The lane queue length lock is poisoned!!");
|
||||
return None
|
||||
};
|
||||
|
||||
// get the number of pending replies waiting for reply surbs
|
||||
let reply_queue_length = self
|
||||
.reply_controller_sender
|
||||
.get_lane_queue_length(connection_id)
|
||||
.await;
|
||||
|
||||
let queue_length = base_length + reply_queue_length;
|
||||
|
||||
let time_taken = req_start.elapsed();
|
||||
let msg =
|
||||
format!("it took {time_taken:?} to get lane length for connection {connection_id}. The length is: {queue_length} = {base_length} (already queued up) + {reply_queue_length} (waiting for reply SURBs)");
|
||||
|
||||
if time_taken > Duration::from_millis(1) {
|
||||
info!("{msg}");
|
||||
} else if time_taken > Duration::from_millis(10) {
|
||||
warn!("{msg}");
|
||||
} else if time_taken > Duration::from_millis(50) {
|
||||
error!("{msg}");
|
||||
}
|
||||
|
||||
Some(ServerResponse::LaneQueueLength {
|
||||
lane: connection_id,
|
||||
queue_length,
|
||||
})
|
||||
}
|
||||
|
||||
async fn handle_send(
|
||||
&mut self,
|
||||
recipient: Recipient,
|
||||
@@ -121,27 +171,11 @@ impl Handler {
|
||||
.expect("InputMessageReceiver has stopped receiving!");
|
||||
|
||||
// Only reply back with a `LaneQueueLength` if the sender providided a connection id
|
||||
let connection_id = match lane {
|
||||
TransmissionLane::General
|
||||
| TransmissionLane::ReplySurbRequest
|
||||
| TransmissionLane::Retransmission
|
||||
| TransmissionLane::AdditionalReplySurbs => return None,
|
||||
TransmissionLane::ConnectionId(id) => id,
|
||||
let TransmissionLane::ConnectionId(connection_id) = lane else {
|
||||
return None
|
||||
};
|
||||
|
||||
// on receiving a send, we reply back the current lane queue length for that connection id.
|
||||
// Note that this does _NOT_ take into account the packets that have been received but not
|
||||
// yet reach `OutQueueControl`, so it might be a tad low.
|
||||
if let Ok(lane_queue_lengths) = self.lane_queue_lengths.lock() {
|
||||
let queue_length = lane_queue_lengths.get(&lane).unwrap_or(0);
|
||||
return Some(ServerResponse::LaneQueueLength {
|
||||
lane: connection_id,
|
||||
queue_length,
|
||||
});
|
||||
}
|
||||
|
||||
log::warn!("Failed to get the lane queue length lock, not responding back with the current queue length");
|
||||
None
|
||||
self.get_lane_queue_length(connection_id).await
|
||||
}
|
||||
|
||||
async fn handle_send_anonymous(
|
||||
@@ -168,27 +202,11 @@ impl Handler {
|
||||
.expect("InputMessageReceiver has stopped receiving!");
|
||||
|
||||
// Only reply back with a `LaneQueueLength` if the sender providided a connection id
|
||||
let connection_id = match lane {
|
||||
TransmissionLane::General
|
||||
| TransmissionLane::ReplySurbRequest
|
||||
| TransmissionLane::Retransmission
|
||||
| TransmissionLane::AdditionalReplySurbs => return None,
|
||||
TransmissionLane::ConnectionId(id) => id,
|
||||
let TransmissionLane::ConnectionId(connection_id) = lane else {
|
||||
return None
|
||||
};
|
||||
|
||||
// on receiving a send, we reply back the current lane queue length for that connection id.
|
||||
// Note that this does _NOT_ take into account the packets that have been received but not
|
||||
// yet reach `OutQueueControl`, so it might be a tad low.
|
||||
if let Ok(lane_queue_lengths) = self.lane_queue_lengths.lock() {
|
||||
let queue_length = lane_queue_lengths.get(&lane).unwrap_or(0);
|
||||
return Some(ServerResponse::LaneQueueLength {
|
||||
lane: connection_id,
|
||||
queue_length,
|
||||
});
|
||||
}
|
||||
|
||||
log::warn!("Failed to get the lane queue length lock, not responding back with the current queue length");
|
||||
None
|
||||
self.get_lane_queue_length(connection_id).await
|
||||
}
|
||||
|
||||
async fn handle_reply(
|
||||
@@ -211,27 +229,11 @@ impl Handler {
|
||||
.expect("InputMessageReceiver has stopped receiving!");
|
||||
|
||||
// Only reply back with a `LaneQueueLength` if the sender providided a connection id
|
||||
let connection_id = match lane {
|
||||
TransmissionLane::General
|
||||
| TransmissionLane::ReplySurbRequest
|
||||
| TransmissionLane::Retransmission
|
||||
| TransmissionLane::AdditionalReplySurbs => return None,
|
||||
TransmissionLane::ConnectionId(id) => id,
|
||||
let TransmissionLane::ConnectionId(connection_id) = lane else {
|
||||
return None
|
||||
};
|
||||
|
||||
// on receiving a send, we reply back the current lane queue length for that connection id.
|
||||
// Note that this does _NOT_ take into account the packets that have been received but not
|
||||
// yet reach `OutQueueControl`, so it might be a tad low.
|
||||
if let Ok(lane_queue_lengths) = self.lane_queue_lengths.lock() {
|
||||
let queue_length = lane_queue_lengths.get(&lane).unwrap_or(0);
|
||||
return Some(ServerResponse::LaneQueueLength {
|
||||
lane: connection_id,
|
||||
queue_length,
|
||||
});
|
||||
}
|
||||
|
||||
log::warn!("Failed to get the lane queue length lock, not responding back with the current queue length");
|
||||
None
|
||||
self.get_lane_queue_length(connection_id).await
|
||||
}
|
||||
|
||||
fn handle_self_address(&self) -> ServerResponse {
|
||||
@@ -245,20 +247,8 @@ impl Handler {
|
||||
None
|
||||
}
|
||||
|
||||
fn handle_get_lane_queue_length(&self, connection_id: u64) -> Option<ServerResponse> {
|
||||
let Ok(lane_queue_lengths) = self.lane_queue_lengths.lock() else {
|
||||
log::warn!(
|
||||
"Failed to get the lane queue length lock, not responding back with the current queue length"
|
||||
);
|
||||
return None;
|
||||
};
|
||||
|
||||
let lane = TransmissionLane::ConnectionId(connection_id);
|
||||
let queue_length = lane_queue_lengths.get(&lane).unwrap_or(0);
|
||||
Some(ServerResponse::LaneQueueLength {
|
||||
lane: connection_id,
|
||||
queue_length,
|
||||
})
|
||||
async fn handle_get_lane_queue_length(&self, connection_id: u64) -> Option<ServerResponse> {
|
||||
self.get_lane_queue_length(connection_id).await
|
||||
}
|
||||
|
||||
async fn handle_request(&mut self, request: ClientRequest) -> Option<ServerResponse> {
|
||||
@@ -287,7 +277,7 @@ impl Handler {
|
||||
|
||||
ClientRequest::SelfAddress => Some(self.handle_self_address()),
|
||||
ClientRequest::ClosedConnection(id) => self.handle_closed_connection(id),
|
||||
ClientRequest::GetLaneQueueLength(id) => self.handle_get_lane_queue_length(id),
|
||||
ClientRequest::GetLaneQueueLength(id) => self.handle_get_lane_queue_length(id).await,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use futures::channel::mpsc;
|
||||
use std::collections::HashMap;
|
||||
|
||||
pub type ConnectionId = u64;
|
||||
|
||||
|
||||
@@ -208,6 +208,11 @@ impl Fragment {
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the size of payload contained in this `Fragment`.
|
||||
pub fn payload_size(&self) -> usize {
|
||||
self.payload.len()
|
||||
}
|
||||
|
||||
/// Extracts id of this `Fragment`.
|
||||
pub fn id(&self) -> i32 {
|
||||
self.header.id
|
||||
|
||||
Reference in New Issue
Block a user