Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 3bbc812c2c | |||
| b2fb2c79b2 | |||
| 2df817082b |
Generated
+2
@@ -5960,6 +5960,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"nym-bin-common",
|
||||
"nym-crypto",
|
||||
"nym-sdk",
|
||||
"nym-service-provider-requests-common",
|
||||
"nym-sphinx",
|
||||
"rand 0.8.5",
|
||||
@@ -5968,6 +5969,7 @@ dependencies = [
|
||||
"time",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -117,7 +117,7 @@ impl Drop for Handler {
|
||||
}
|
||||
|
||||
impl Handler {
|
||||
async fn get_lane_queue_length(&self, connection_id: ConnectionId) -> Option<ServerResponse> {
|
||||
async fn get_lane_queue_length(&mut self, connection_id: ConnectionId) -> Option<ServerResponse> {
|
||||
let req_start = Instant::now();
|
||||
|
||||
// get the base queue length
|
||||
@@ -282,7 +282,7 @@ impl Handler {
|
||||
None
|
||||
}
|
||||
|
||||
async fn handle_get_lane_queue_length(&self, connection_id: u64) -> Option<ServerResponse> {
|
||||
async fn handle_get_lane_queue_length(&mut self, connection_id: u64) -> Option<ServerResponse> {
|
||||
self.get_lane_queue_length(connection_id).await
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ pub type ClientRequestSender = tokio::sync::mpsc::Sender<ClientRequest>;
|
||||
pub mod transceiver;
|
||||
|
||||
// We remind ourselves that 32 x 32kb = 1024kb, a reasonable size for a network buffer.
|
||||
pub const MIX_MESSAGE_RECEIVER_BUFFER_SIZE: usize = 32;
|
||||
pub const MIX_MESSAGE_RECEIVER_BUFFER_SIZE: usize = 5;
|
||||
const MAX_FAILURE_COUNT: usize = 100;
|
||||
|
||||
// that's also disgusting.
|
||||
@@ -96,6 +96,7 @@ impl MixTrafficController {
|
||||
mut mix_packets: Vec<MixPacket>,
|
||||
) -> Result<(), ErasedGatewayError> {
|
||||
debug_assert!(!mix_packets.is_empty());
|
||||
info!("on_messages: received {} mix_packets", mix_packets.len());
|
||||
|
||||
let result = if mix_packets.len() == 1 {
|
||||
let mix_packet = mix_packets.pop().unwrap();
|
||||
|
||||
+17
-2
@@ -69,7 +69,7 @@ where
|
||||
// offload reply handling to the dedicated task
|
||||
if let Err(err) = self
|
||||
.reply_controller_sender
|
||||
.send_reply(recipient_tag, data, lane)
|
||||
.send_reply(recipient_tag, data, lane).await
|
||||
{
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
error!("failed to send a reply - {err}");
|
||||
@@ -111,12 +111,17 @@ where
|
||||
}
|
||||
|
||||
async fn on_input_message(&mut self, msg: InputMessage) {
|
||||
log::info!(
|
||||
"input_message_listener: received a message: {:?}",
|
||||
msg.lane()
|
||||
);
|
||||
match msg {
|
||||
InputMessage::Regular {
|
||||
recipient,
|
||||
data,
|
||||
lane,
|
||||
} => {
|
||||
log::info!("input_message_listener: regular");
|
||||
self.handle_plain_message(recipient, data, lane, PacketType::Mix)
|
||||
.await
|
||||
}
|
||||
@@ -126,6 +131,7 @@ where
|
||||
reply_surbs,
|
||||
lane,
|
||||
} => {
|
||||
log::info!("input_message_listener: anonymous");
|
||||
self.handle_repliable_message(recipient, data, reply_surbs, lane, PacketType::Mix)
|
||||
.await
|
||||
}
|
||||
@@ -134,9 +140,13 @@ where
|
||||
data,
|
||||
lane,
|
||||
} => {
|
||||
log::info!("input_message_listener: reply");
|
||||
self.handle_reply(recipient_tag, data, lane).await;
|
||||
}
|
||||
InputMessage::Premade { msgs, lane } => self.handle_premade_packets(msgs, lane).await,
|
||||
InputMessage::Premade { msgs, lane } => {
|
||||
log::info!("input_message_listener: premade");
|
||||
self.handle_premade_packets(msgs, lane).await
|
||||
}
|
||||
InputMessage::MessageWrapper {
|
||||
message,
|
||||
packet_type,
|
||||
@@ -146,6 +156,8 @@ where
|
||||
data,
|
||||
lane,
|
||||
} => {
|
||||
log::info!("input_message_listener: wrapper regular");
|
||||
|
||||
self.handle_plain_message(recipient, data, lane, packet_type)
|
||||
.await
|
||||
}
|
||||
@@ -155,6 +167,7 @@ where
|
||||
reply_surbs,
|
||||
lane,
|
||||
} => {
|
||||
log::info!("input_message_listener: wrapper anonymous");
|
||||
self.handle_repliable_message(recipient, data, reply_surbs, lane, packet_type)
|
||||
.await
|
||||
}
|
||||
@@ -163,9 +176,11 @@ where
|
||||
data,
|
||||
lane,
|
||||
} => {
|
||||
log::info!("input_message_listener: wrapper reply");
|
||||
self.handle_reply(recipient_tag, data, lane).await;
|
||||
}
|
||||
InputMessage::Premade { msgs, lane } => {
|
||||
log::info!("input_message_listener: wrapper premade");
|
||||
self.handle_premade_packets(msgs, lane).await
|
||||
}
|
||||
// MessageWrappers can't be nested
|
||||
|
||||
+9
-5
@@ -99,11 +99,15 @@ where
|
||||
} => {
|
||||
// if this is retransmission for reply, offload it to the dedicated task
|
||||
// that deals with all the surbs
|
||||
if let Err(err) = self.reply_controller_sender.send_retransmission_data(
|
||||
*recipient_tag,
|
||||
weak_timed_out_ack,
|
||||
*extra_surb_request,
|
||||
) {
|
||||
if let Err(err) = self
|
||||
.reply_controller_sender
|
||||
.send_retransmission_data(
|
||||
*recipient_tag,
|
||||
weak_timed_out_ack,
|
||||
*extra_surb_request,
|
||||
)
|
||||
.await
|
||||
{
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
error!("Failed to send retransmission data to the reply controller: {err}");
|
||||
}
|
||||
|
||||
@@ -230,7 +230,7 @@ where
|
||||
let (next_message, fragment_id, packet_size) = match next_message {
|
||||
StreamMessage::Cover => {
|
||||
let cover_traffic_packet_size = self.loop_cover_message_size();
|
||||
trace!("the next loop cover message will be put in a {cover_traffic_packet_size} packet");
|
||||
info!("the next loop cover message will be put in a {cover_traffic_packet_size} packet");
|
||||
|
||||
// TODO for way down the line: in very rare cases (during topology update) we might have
|
||||
// to wait a really tiny bit before actually obtaining the permit hence messing with our
|
||||
@@ -277,6 +277,8 @@ where
|
||||
}
|
||||
};
|
||||
|
||||
let mix_tx_capacity = self.mix_tx.capacity();
|
||||
log::info!("mix_tx_capacity: {mix_tx_capacity}");
|
||||
if let Err(err) = self.mix_tx.send(vec![next_message]).await {
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
log::error!("Failed to send: {err}");
|
||||
@@ -536,13 +538,13 @@ where
|
||||
format!("Packet backlog: {lane_status}, avg delay: {delay}ms ({mult})")
|
||||
};
|
||||
|
||||
if packets > 1000 {
|
||||
log::warn!("{status_str}");
|
||||
} else if packets > 0 {
|
||||
// if packets > 1000 {
|
||||
// log::warn!("{status_str}");
|
||||
// } else if packets > 0 {
|
||||
log::info!("{status_str}");
|
||||
} else {
|
||||
log::debug!("{status_str}");
|
||||
}
|
||||
// } else {
|
||||
// log::debug!("{status_str}");
|
||||
// }
|
||||
|
||||
// Send status message to whoever is listening (possibly UI)
|
||||
if mult == self.sending_delay_controller.max_multiplier() {
|
||||
@@ -559,7 +561,7 @@ where
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
{
|
||||
let mut status_timer = tokio::time::interval(Duration::from_secs(5));
|
||||
let mut status_timer = tokio::time::interval(Duration::from_secs(1));
|
||||
|
||||
while !shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
@@ -572,6 +574,7 @@ where
|
||||
self.log_status(&mut shutdown);
|
||||
}
|
||||
next_message = self.next() => if let Some(next_message) = next_message {
|
||||
self.log_status(&mut shutdown);
|
||||
self.on_message(next_message).await;
|
||||
} else {
|
||||
log::trace!("OutQueueControl: Stopping since channel closed");
|
||||
|
||||
@@ -224,7 +224,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
|
||||
msgs.into_iter().map(Into::into).collect()
|
||||
}
|
||||
|
||||
fn handle_reconstructed_repliable_messages(
|
||||
async fn handle_reconstructed_repliable_messages(
|
||||
&mut self,
|
||||
msgs: Vec<RepliableMessage>,
|
||||
) -> Vec<ReconstructedMessage> {
|
||||
@@ -261,11 +261,11 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(err) = self.reply_controller_sender.send_additional_surbs(
|
||||
msg.sender_tag,
|
||||
reply_surbs,
|
||||
from_surb_request,
|
||||
) {
|
||||
if let Err(err) = self
|
||||
.reply_controller_sender
|
||||
.send_additional_surbs(msg.sender_tag, reply_surbs, from_surb_request)
|
||||
.await
|
||||
{
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
error!("{err}");
|
||||
}
|
||||
@@ -274,7 +274,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
|
||||
reconstructed
|
||||
}
|
||||
|
||||
fn handle_reconstructed_reply_messages(
|
||||
async fn handle_reconstructed_reply_messages(
|
||||
&mut self,
|
||||
msgs: Vec<ReplyMessage>,
|
||||
) -> Vec<ReconstructedMessage> {
|
||||
@@ -287,6 +287,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
|
||||
if let Err(err) = self
|
||||
.reply_controller_sender
|
||||
.send_additional_surbs_request(*recipient, amount)
|
||||
.await
|
||||
{
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
error!("{err}");
|
||||
@@ -316,10 +317,16 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
|
||||
}
|
||||
|
||||
let mut reconstructed_messages = self.handle_reconstructed_plain_messages(plain_messages);
|
||||
reconstructed_messages
|
||||
.append(&mut self.handle_reconstructed_repliable_messages(repliable_messages));
|
||||
reconstructed_messages
|
||||
.append(&mut self.handle_reconstructed_reply_messages(reply_messages));
|
||||
reconstructed_messages.append(
|
||||
&mut self
|
||||
.handle_reconstructed_repliable_messages(repliable_messages)
|
||||
.await,
|
||||
);
|
||||
reconstructed_messages.append(
|
||||
&mut self
|
||||
.handle_reconstructed_reply_messages(reply_messages)
|
||||
.await,
|
||||
);
|
||||
|
||||
let mut inner_guard = self.inner.lock().await;
|
||||
debug!(
|
||||
|
||||
@@ -2,7 +2,10 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::client::real_messages_control::acknowledgement_control::PendingAcknowledgement;
|
||||
use futures::channel::{mpsc, oneshot};
|
||||
use futures::{
|
||||
channel::{mpsc, oneshot},
|
||||
SinkExt,
|
||||
};
|
||||
use log::error;
|
||||
use nym_sphinx::addressing::clients::Recipient;
|
||||
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
|
||||
@@ -11,117 +14,128 @@ use nym_task::connections::{ConnectionId, TransmissionLane};
|
||||
use std::sync::Weak;
|
||||
|
||||
pub(crate) fn new_control_channels() -> (ReplyControllerSender, ReplyControllerReceiver) {
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
let (tx, rx) = mpsc::channel(8);
|
||||
(tx.into(), rx)
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ReplyControllerSenderError {
|
||||
#[error("failed to send retransmission data to reply controller")]
|
||||
SendRetransmissionData(#[source] mpsc::TrySendError<ReplyControllerMessage>),
|
||||
// SendRetransmissionData(#[source] mpsc::TrySendError<ReplyControllerMessage>),
|
||||
SendRetransmissionData,
|
||||
|
||||
#[error("failed to send reply to reply controller")]
|
||||
SendReply(#[source] mpsc::TrySendError<ReplyControllerMessage>),
|
||||
// SendReply(#[source] mpsc::TrySendError<ReplyControllerMessage>),
|
||||
SendReply,
|
||||
|
||||
#[error("failed to send additional surbs to reply controller")]
|
||||
AdditionalSurbs(#[source] mpsc::TrySendError<ReplyControllerMessage>),
|
||||
// AdditionalSurbs(#[source] mpsc::TrySendError<ReplyControllerMessage>),
|
||||
AdditionalSurbs,
|
||||
|
||||
#[error("failed to send additional surbs request to reply controller")]
|
||||
AdditionalSurbsRequest(#[source] mpsc::TrySendError<ReplyControllerMessage>),
|
||||
// AdditionalSurbsRequest(#[source] mpsc::TrySendError<ReplyControllerMessage>),
|
||||
AdditionalSurbsRequest,
|
||||
|
||||
#[error("failed to request lane queue length from reply controller")]
|
||||
LaneQueueLength(#[source] mpsc::TrySendError<ReplyControllerMessage>),
|
||||
// LaneQueueLength(#[source] mpsc::TrySendError<ReplyControllerMessage>),
|
||||
LaneQueueLength,
|
||||
|
||||
#[error("response channel was dropped before we could receive the response")]
|
||||
ResponseChannelDropped(#[source] oneshot::Canceled),
|
||||
// ResponseChannelDropped(#[source] oneshot::Canceled),
|
||||
ResponseChannelDropped,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ReplyControllerSender(mpsc::UnboundedSender<ReplyControllerMessage>);
|
||||
pub struct ReplyControllerSender(mpsc::Sender<ReplyControllerMessage>);
|
||||
|
||||
impl From<mpsc::UnboundedSender<ReplyControllerMessage>> for ReplyControllerSender {
|
||||
fn from(inner: mpsc::UnboundedSender<ReplyControllerMessage>) -> Self {
|
||||
impl From<mpsc::Sender<ReplyControllerMessage>> for ReplyControllerSender {
|
||||
fn from(inner: mpsc::Sender<ReplyControllerMessage>) -> Self {
|
||||
ReplyControllerSender(inner)
|
||||
}
|
||||
}
|
||||
|
||||
impl ReplyControllerSender {
|
||||
pub(crate) fn send_retransmission_data(
|
||||
&self,
|
||||
pub(crate) async fn send_retransmission_data(
|
||||
&mut self,
|
||||
recipient: AnonymousSenderTag,
|
||||
timed_out_ack: Weak<PendingAcknowledgement>,
|
||||
extra_surb_request: bool,
|
||||
) -> Result<(), ReplyControllerSenderError> {
|
||||
self.0
|
||||
.unbounded_send(ReplyControllerMessage::RetransmitReply {
|
||||
.send(ReplyControllerMessage::RetransmitReply {
|
||||
recipient,
|
||||
timed_out_ack,
|
||||
extra_surb_request,
|
||||
})
|
||||
.map_err(ReplyControllerSenderError::SendRetransmissionData)
|
||||
.await
|
||||
.map_err(|_| ReplyControllerSenderError::SendRetransmissionData)
|
||||
}
|
||||
|
||||
pub(crate) fn send_reply(
|
||||
&self,
|
||||
pub(crate) async fn send_reply(
|
||||
&mut self,
|
||||
recipient: AnonymousSenderTag,
|
||||
message: Vec<u8>,
|
||||
lane: TransmissionLane,
|
||||
) -> Result<(), ReplyControllerSenderError> {
|
||||
self.0
|
||||
.unbounded_send(ReplyControllerMessage::SendReply {
|
||||
.send(ReplyControllerMessage::SendReply {
|
||||
recipient,
|
||||
message,
|
||||
lane,
|
||||
})
|
||||
.map_err(ReplyControllerSenderError::SendReply)
|
||||
.await
|
||||
.map_err(|_| ReplyControllerSenderError::SendReply)
|
||||
}
|
||||
|
||||
pub(crate) fn send_additional_surbs(
|
||||
&self,
|
||||
pub(crate) async fn send_additional_surbs(
|
||||
&mut self,
|
||||
sender_tag: AnonymousSenderTag,
|
||||
reply_surbs: Vec<ReplySurb>,
|
||||
from_surb_request: bool,
|
||||
) -> Result<(), ReplyControllerSenderError> {
|
||||
self.0
|
||||
.unbounded_send(ReplyControllerMessage::AdditionalSurbs {
|
||||
.send(ReplyControllerMessage::AdditionalSurbs {
|
||||
sender_tag,
|
||||
reply_surbs,
|
||||
from_surb_request,
|
||||
})
|
||||
.map_err(ReplyControllerSenderError::AdditionalSurbs)
|
||||
.await
|
||||
.map_err(|_| ReplyControllerSenderError::AdditionalSurbs)
|
||||
}
|
||||
|
||||
pub(crate) fn send_additional_surbs_request(
|
||||
&self,
|
||||
pub(crate) async fn send_additional_surbs_request(
|
||||
&mut self,
|
||||
recipient: Recipient,
|
||||
amount: u32,
|
||||
) -> Result<(), ReplyControllerSenderError> {
|
||||
self.0
|
||||
.unbounded_send(ReplyControllerMessage::AdditionalSurbsRequest {
|
||||
.send(ReplyControllerMessage::AdditionalSurbsRequest {
|
||||
recipient: Box::new(recipient),
|
||||
amount,
|
||||
})
|
||||
.map_err(ReplyControllerSenderError::AdditionalSurbsRequest)
|
||||
.await
|
||||
.map_err(|_| ReplyControllerSenderError::AdditionalSurbsRequest)
|
||||
}
|
||||
|
||||
pub async fn get_lane_queue_length(
|
||||
&self,
|
||||
&mut self,
|
||||
connection_id: ConnectionId,
|
||||
) -> Result<usize, ReplyControllerSenderError> {
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
if let Err(err) = self
|
||||
if let Err(_err) = self
|
||||
.0
|
||||
.unbounded_send(ReplyControllerMessage::LaneQueueLength {
|
||||
.send(ReplyControllerMessage::LaneQueueLength {
|
||||
connection_id,
|
||||
response_channel: response_tx,
|
||||
})
|
||||
.await
|
||||
{
|
||||
return Err(ReplyControllerSenderError::LaneQueueLength(err));
|
||||
return Err(ReplyControllerSenderError::LaneQueueLength);
|
||||
}
|
||||
|
||||
response_rx
|
||||
.await
|
||||
.map_err(ReplyControllerSenderError::ResponseChannelDropped)
|
||||
.map_err(|_| ReplyControllerSenderError::ResponseChannelDropped)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -137,7 +151,7 @@ impl ReplyQueueLengths {
|
||||
}
|
||||
|
||||
pub async fn get_lane_queue_length(
|
||||
&self,
|
||||
&mut self,
|
||||
connection_id: ConnectionId,
|
||||
) -> Result<usize, ReplyControllerSenderError> {
|
||||
self.reply_controller_sender
|
||||
@@ -146,7 +160,7 @@ impl ReplyQueueLengths {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) type ReplyControllerReceiver = mpsc::UnboundedReceiver<ReplyControllerMessage>;
|
||||
pub(crate) type ReplyControllerReceiver = mpsc::Receiver<ReplyControllerMessage>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ReplyControllerMessage {
|
||||
|
||||
@@ -21,3 +21,5 @@ thiserror = { workspace = true }
|
||||
time = { workspace = true }
|
||||
tokio = { workspace = true, features = ["time"] }
|
||||
tokio-util = { workspace = true, features = ["codec"] }
|
||||
nym-sdk = { path = "../../sdk/rust/nym-sdk" }
|
||||
tracing.workspace = true
|
||||
|
||||
@@ -74,6 +74,39 @@ impl MultiIpPacketCodec {
|
||||
Some(packets)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn buffer_timeout2(
|
||||
&mut self,
|
||||
lane_queue_lengths: nym_sdk::mixnet::LaneQueueLengths,
|
||||
) -> Option<Bytes> {
|
||||
// Wait for buffer_timeout to tick
|
||||
let _ = self.buffer_timeout.tick().await;
|
||||
|
||||
// wait for lane_queue_lenghts to go to zero
|
||||
{
|
||||
let now = std::time::Instant::now();
|
||||
loop {
|
||||
let lane_queue_lengths = lane_queue_lengths.lock().unwrap().total();
|
||||
if lane_queue_lengths < 10 {
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
let elapsed = now.elapsed();
|
||||
tracing::error!(
|
||||
"Waited for lane_queue_lengths to go to zero for {:?}",
|
||||
elapsed
|
||||
);
|
||||
}
|
||||
|
||||
// Flush the buffer and return it
|
||||
let packets = self.flush_current_buffer();
|
||||
if packets.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(packets)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Encoder<Bytes> for MultiIpPacketCodec {
|
||||
|
||||
@@ -103,4 +103,8 @@ impl LaneQueueLengthsInner {
|
||||
{
|
||||
self.map.entry(*lane).and_modify(f);
|
||||
}
|
||||
|
||||
pub fn total(&self) -> usize {
|
||||
self.map.values().sum()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ pub use nym_sphinx::{
|
||||
pub use nym_statistics_common::clients::{
|
||||
connection::ConnectionStatsEvent, ClientStatsEvents, ClientStatsSender,
|
||||
};
|
||||
pub use nym_task::connections::TransmissionLane;
|
||||
pub use nym_task::connections::{LaneQueueLengths, TransmissionLane};
|
||||
pub use nym_topology::{provider_trait::TopologyProvider, NymTopology};
|
||||
pub use paths::StoragePaths;
|
||||
pub use socks5_client::Socks5MixnetClient;
|
||||
|
||||
Reference in New Issue
Block a user