Compare commits

...

3 Commits

Author SHA1 Message Date
Jon Häggblad 3bbc812c2c wip 2025-03-12 11:44:39 +01:00
Jon Häggblad b2fb2c79b2 wip 2025-03-12 11:44:39 +01:00
Jon Häggblad 2df817082b wip 2025-03-12 11:44:39 +01:00
12 changed files with 149 additions and 64 deletions
Generated
+2
View File
@@ -5960,6 +5960,7 @@ dependencies = [
"bytes",
"nym-bin-common",
"nym-crypto",
"nym-sdk",
"nym-service-provider-requests-common",
"nym-sphinx",
"rand 0.8.5",
@@ -5968,6 +5969,7 @@ dependencies = [
"time",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
+2 -2
View File
@@ -117,7 +117,7 @@ impl Drop for Handler {
}
impl Handler {
async fn get_lane_queue_length(&self, connection_id: ConnectionId) -> Option<ServerResponse> {
async fn get_lane_queue_length(&mut self, connection_id: ConnectionId) -> Option<ServerResponse> {
let req_start = Instant::now();
// get the base queue length
@@ -282,7 +282,7 @@ impl Handler {
None
}
async fn handle_get_lane_queue_length(&self, connection_id: u64) -> Option<ServerResponse> {
async fn handle_get_lane_queue_length(&mut self, connection_id: u64) -> Option<ServerResponse> {
self.get_lane_queue_length(connection_id).await
}
@@ -18,7 +18,7 @@ pub type ClientRequestSender = tokio::sync::mpsc::Sender<ClientRequest>;
pub mod transceiver;
// We remind ourselves that 32 x 32kb = 1024kb, a reasonable size for a network buffer.
pub const MIX_MESSAGE_RECEIVER_BUFFER_SIZE: usize = 32;
pub const MIX_MESSAGE_RECEIVER_BUFFER_SIZE: usize = 5;
const MAX_FAILURE_COUNT: usize = 100;
// that's also disgusting.
@@ -96,6 +96,7 @@ impl MixTrafficController {
mut mix_packets: Vec<MixPacket>,
) -> Result<(), ErasedGatewayError> {
debug_assert!(!mix_packets.is_empty());
info!("on_messages: received {} mix_packets", mix_packets.len());
let result = if mix_packets.len() == 1 {
let mix_packet = mix_packets.pop().unwrap();
@@ -69,7 +69,7 @@ where
// offload reply handling to the dedicated task
if let Err(err) = self
.reply_controller_sender
.send_reply(recipient_tag, data, lane)
.send_reply(recipient_tag, data, lane).await
{
if !self.task_client.is_shutdown_poll() {
error!("failed to send a reply - {err}");
@@ -111,12 +111,17 @@ where
}
async fn on_input_message(&mut self, msg: InputMessage) {
log::info!(
"input_message_listener: received a message: {:?}",
msg.lane()
);
match msg {
InputMessage::Regular {
recipient,
data,
lane,
} => {
log::info!("input_message_listener: regular");
self.handle_plain_message(recipient, data, lane, PacketType::Mix)
.await
}
@@ -126,6 +131,7 @@ where
reply_surbs,
lane,
} => {
log::info!("input_message_listener: anonymous");
self.handle_repliable_message(recipient, data, reply_surbs, lane, PacketType::Mix)
.await
}
@@ -134,9 +140,13 @@ where
data,
lane,
} => {
log::info!("input_message_listener: reply");
self.handle_reply(recipient_tag, data, lane).await;
}
InputMessage::Premade { msgs, lane } => self.handle_premade_packets(msgs, lane).await,
InputMessage::Premade { msgs, lane } => {
log::info!("input_message_listener: premade");
self.handle_premade_packets(msgs, lane).await
}
InputMessage::MessageWrapper {
message,
packet_type,
@@ -146,6 +156,8 @@ where
data,
lane,
} => {
log::info!("input_message_listener: wrapper regular");
self.handle_plain_message(recipient, data, lane, packet_type)
.await
}
@@ -155,6 +167,7 @@ where
reply_surbs,
lane,
} => {
log::info!("input_message_listener: wrapper anonymous");
self.handle_repliable_message(recipient, data, reply_surbs, lane, packet_type)
.await
}
@@ -163,9 +176,11 @@ where
data,
lane,
} => {
log::info!("input_message_listener: wrapper reply");
self.handle_reply(recipient_tag, data, lane).await;
}
InputMessage::Premade { msgs, lane } => {
log::info!("input_message_listener: wrapper premade");
self.handle_premade_packets(msgs, lane).await
}
// MessageWrappers can't be nested
@@ -99,11 +99,15 @@ where
} => {
// if this is retransmission for reply, offload it to the dedicated task
// that deals with all the surbs
if let Err(err) = self.reply_controller_sender.send_retransmission_data(
*recipient_tag,
weak_timed_out_ack,
*extra_surb_request,
) {
if let Err(err) = self
.reply_controller_sender
.send_retransmission_data(
*recipient_tag,
weak_timed_out_ack,
*extra_surb_request,
)
.await
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to send retransmission data to the reply controller: {err}");
}
@@ -230,7 +230,7 @@ where
let (next_message, fragment_id, packet_size) = match next_message {
StreamMessage::Cover => {
let cover_traffic_packet_size = self.loop_cover_message_size();
trace!("the next loop cover message will be put in a {cover_traffic_packet_size} packet");
info!("the next loop cover message will be put in a {cover_traffic_packet_size} packet");
// TODO for way down the line: in very rare cases (during topology update) we might have
// to wait a really tiny bit before actually obtaining the permit hence messing with our
@@ -277,6 +277,8 @@ where
}
};
let mix_tx_capacity = self.mix_tx.capacity();
log::info!("mix_tx_capacity: {mix_tx_capacity}");
if let Err(err) = self.mix_tx.send(vec![next_message]).await {
if !self.task_client.is_shutdown_poll() {
log::error!("Failed to send: {err}");
@@ -536,13 +538,13 @@ where
format!("Packet backlog: {lane_status}, avg delay: {delay}ms ({mult})")
};
if packets > 1000 {
log::warn!("{status_str}");
} else if packets > 0 {
// if packets > 1000 {
// log::warn!("{status_str}");
// } else if packets > 0 {
log::info!("{status_str}");
} else {
log::debug!("{status_str}");
}
// } else {
// log::debug!("{status_str}");
// }
// Send status message to whoever is listening (possibly UI)
if mult == self.sending_delay_controller.max_multiplier() {
@@ -559,7 +561,7 @@ where
#[cfg(not(target_arch = "wasm32"))]
{
let mut status_timer = tokio::time::interval(Duration::from_secs(5));
let mut status_timer = tokio::time::interval(Duration::from_secs(1));
while !shutdown.is_shutdown() {
tokio::select! {
@@ -572,6 +574,7 @@ where
self.log_status(&mut shutdown);
}
next_message = self.next() => if let Some(next_message) = next_message {
self.log_status(&mut shutdown);
self.on_message(next_message).await;
} else {
log::trace!("OutQueueControl: Stopping since channel closed");
@@ -224,7 +224,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
msgs.into_iter().map(Into::into).collect()
}
fn handle_reconstructed_repliable_messages(
async fn handle_reconstructed_repliable_messages(
&mut self,
msgs: Vec<RepliableMessage>,
) -> Vec<ReconstructedMessage> {
@@ -261,11 +261,11 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
}
};
if let Err(err) = self.reply_controller_sender.send_additional_surbs(
msg.sender_tag,
reply_surbs,
from_surb_request,
) {
if let Err(err) = self
.reply_controller_sender
.send_additional_surbs(msg.sender_tag, reply_surbs, from_surb_request)
.await
{
if !self.task_client.is_shutdown_poll() {
error!("{err}");
}
@@ -274,7 +274,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
reconstructed
}
fn handle_reconstructed_reply_messages(
async fn handle_reconstructed_reply_messages(
&mut self,
msgs: Vec<ReplyMessage>,
) -> Vec<ReconstructedMessage> {
@@ -287,6 +287,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
if let Err(err) = self
.reply_controller_sender
.send_additional_surbs_request(*recipient, amount)
.await
{
if !self.task_client.is_shutdown_poll() {
error!("{err}");
@@ -316,10 +317,16 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
}
let mut reconstructed_messages = self.handle_reconstructed_plain_messages(plain_messages);
reconstructed_messages
.append(&mut self.handle_reconstructed_repliable_messages(repliable_messages));
reconstructed_messages
.append(&mut self.handle_reconstructed_reply_messages(reply_messages));
reconstructed_messages.append(
&mut self
.handle_reconstructed_repliable_messages(repliable_messages)
.await,
);
reconstructed_messages.append(
&mut self
.handle_reconstructed_reply_messages(reply_messages)
.await,
);
let mut inner_guard = self.inner.lock().await;
debug!(
@@ -2,7 +2,10 @@
// SPDX-License-Identifier: Apache-2.0
use crate::client::real_messages_control::acknowledgement_control::PendingAcknowledgement;
use futures::channel::{mpsc, oneshot};
use futures::{
channel::{mpsc, oneshot},
SinkExt,
};
use log::error;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
@@ -11,117 +14,128 @@ use nym_task::connections::{ConnectionId, TransmissionLane};
use std::sync::Weak;
pub(crate) fn new_control_channels() -> (ReplyControllerSender, ReplyControllerReceiver) {
let (tx, rx) = mpsc::unbounded();
let (tx, rx) = mpsc::channel(8);
(tx.into(), rx)
}
#[derive(Debug, thiserror::Error)]
pub enum ReplyControllerSenderError {
#[error("failed to send retransmission data to reply controller")]
SendRetransmissionData(#[source] mpsc::TrySendError<ReplyControllerMessage>),
// SendRetransmissionData(#[source] mpsc::TrySendError<ReplyControllerMessage>),
SendRetransmissionData,
#[error("failed to send reply to reply controller")]
SendReply(#[source] mpsc::TrySendError<ReplyControllerMessage>),
// SendReply(#[source] mpsc::TrySendError<ReplyControllerMessage>),
SendReply,
#[error("failed to send additional surbs to reply controller")]
AdditionalSurbs(#[source] mpsc::TrySendError<ReplyControllerMessage>),
// AdditionalSurbs(#[source] mpsc::TrySendError<ReplyControllerMessage>),
AdditionalSurbs,
#[error("failed to send additional surbs request to reply controller")]
AdditionalSurbsRequest(#[source] mpsc::TrySendError<ReplyControllerMessage>),
// AdditionalSurbsRequest(#[source] mpsc::TrySendError<ReplyControllerMessage>),
AdditionalSurbsRequest,
#[error("failed to request lane queue length from reply controller")]
LaneQueueLength(#[source] mpsc::TrySendError<ReplyControllerMessage>),
// LaneQueueLength(#[source] mpsc::TrySendError<ReplyControllerMessage>),
LaneQueueLength,
#[error("response channel was dropped before we could receive the response")]
ResponseChannelDropped(#[source] oneshot::Canceled),
// ResponseChannelDropped(#[source] oneshot::Canceled),
ResponseChannelDropped,
}
#[derive(Debug, Clone)]
pub struct ReplyControllerSender(mpsc::UnboundedSender<ReplyControllerMessage>);
pub struct ReplyControllerSender(mpsc::Sender<ReplyControllerMessage>);
impl From<mpsc::UnboundedSender<ReplyControllerMessage>> for ReplyControllerSender {
fn from(inner: mpsc::UnboundedSender<ReplyControllerMessage>) -> Self {
impl From<mpsc::Sender<ReplyControllerMessage>> for ReplyControllerSender {
fn from(inner: mpsc::Sender<ReplyControllerMessage>) -> Self {
ReplyControllerSender(inner)
}
}
impl ReplyControllerSender {
pub(crate) fn send_retransmission_data(
&self,
pub(crate) async fn send_retransmission_data(
&mut self,
recipient: AnonymousSenderTag,
timed_out_ack: Weak<PendingAcknowledgement>,
extra_surb_request: bool,
) -> Result<(), ReplyControllerSenderError> {
self.0
.unbounded_send(ReplyControllerMessage::RetransmitReply {
.send(ReplyControllerMessage::RetransmitReply {
recipient,
timed_out_ack,
extra_surb_request,
})
.map_err(ReplyControllerSenderError::SendRetransmissionData)
.await
.map_err(|_| ReplyControllerSenderError::SendRetransmissionData)
}
pub(crate) fn send_reply(
&self,
pub(crate) async fn send_reply(
&mut self,
recipient: AnonymousSenderTag,
message: Vec<u8>,
lane: TransmissionLane,
) -> Result<(), ReplyControllerSenderError> {
self.0
.unbounded_send(ReplyControllerMessage::SendReply {
.send(ReplyControllerMessage::SendReply {
recipient,
message,
lane,
})
.map_err(ReplyControllerSenderError::SendReply)
.await
.map_err(|_| ReplyControllerSenderError::SendReply)
}
pub(crate) fn send_additional_surbs(
&self,
pub(crate) async fn send_additional_surbs(
&mut self,
sender_tag: AnonymousSenderTag,
reply_surbs: Vec<ReplySurb>,
from_surb_request: bool,
) -> Result<(), ReplyControllerSenderError> {
self.0
.unbounded_send(ReplyControllerMessage::AdditionalSurbs {
.send(ReplyControllerMessage::AdditionalSurbs {
sender_tag,
reply_surbs,
from_surb_request,
})
.map_err(ReplyControllerSenderError::AdditionalSurbs)
.await
.map_err(|_| ReplyControllerSenderError::AdditionalSurbs)
}
pub(crate) fn send_additional_surbs_request(
&self,
pub(crate) async fn send_additional_surbs_request(
&mut self,
recipient: Recipient,
amount: u32,
) -> Result<(), ReplyControllerSenderError> {
self.0
.unbounded_send(ReplyControllerMessage::AdditionalSurbsRequest {
.send(ReplyControllerMessage::AdditionalSurbsRequest {
recipient: Box::new(recipient),
amount,
})
.map_err(ReplyControllerSenderError::AdditionalSurbsRequest)
.await
.map_err(|_| ReplyControllerSenderError::AdditionalSurbsRequest)
}
pub async fn get_lane_queue_length(
&self,
&mut self,
connection_id: ConnectionId,
) -> Result<usize, ReplyControllerSenderError> {
let (response_tx, response_rx) = oneshot::channel();
if let Err(err) = self
if let Err(_err) = self
.0
.unbounded_send(ReplyControllerMessage::LaneQueueLength {
.send(ReplyControllerMessage::LaneQueueLength {
connection_id,
response_channel: response_tx,
})
.await
{
return Err(ReplyControllerSenderError::LaneQueueLength(err));
return Err(ReplyControllerSenderError::LaneQueueLength);
}
response_rx
.await
.map_err(ReplyControllerSenderError::ResponseChannelDropped)
.map_err(|_| ReplyControllerSenderError::ResponseChannelDropped)
}
}
@@ -137,7 +151,7 @@ impl ReplyQueueLengths {
}
pub async fn get_lane_queue_length(
&self,
&mut self,
connection_id: ConnectionId,
) -> Result<usize, ReplyControllerSenderError> {
self.reply_controller_sender
@@ -146,7 +160,7 @@ impl ReplyQueueLengths {
}
}
pub(crate) type ReplyControllerReceiver = mpsc::UnboundedReceiver<ReplyControllerMessage>;
pub(crate) type ReplyControllerReceiver = mpsc::Receiver<ReplyControllerMessage>;
#[derive(Debug)]
pub enum ReplyControllerMessage {
+2
View File
@@ -21,3 +21,5 @@ thiserror = { workspace = true }
time = { workspace = true }
tokio = { workspace = true, features = ["time"] }
tokio-util = { workspace = true, features = ["codec"] }
nym-sdk = { path = "../../sdk/rust/nym-sdk" }
tracing.workspace = true
+33
View File
@@ -74,6 +74,39 @@ impl MultiIpPacketCodec {
Some(packets)
}
}
pub async fn buffer_timeout2(
&mut self,
lane_queue_lengths: nym_sdk::mixnet::LaneQueueLengths,
) -> Option<Bytes> {
// Wait for buffer_timeout to tick
let _ = self.buffer_timeout.tick().await;
// wait for lane_queue_lenghts to go to zero
{
let now = std::time::Instant::now();
loop {
let lane_queue_lengths = lane_queue_lengths.lock().unwrap().total();
if lane_queue_lengths < 10 {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
let elapsed = now.elapsed();
tracing::error!(
"Waited for lane_queue_lengths to go to zero for {:?}",
elapsed
);
}
// Flush the buffer and return it
let packets = self.flush_current_buffer();
if packets.is_empty() {
None
} else {
Some(packets)
}
}
}
impl Encoder<Bytes> for MultiIpPacketCodec {
+4
View File
@@ -103,4 +103,8 @@ impl LaneQueueLengthsInner {
{
self.map.entry(*lane).and_modify(f);
}
pub fn total(&self) -> usize {
self.map.values().sum()
}
}
+1 -1
View File
@@ -82,7 +82,7 @@ pub use nym_sphinx::{
pub use nym_statistics_common::clients::{
connection::ConnectionStatsEvent, ClientStatsEvents, ClientStatsSender,
};
pub use nym_task::connections::TransmissionLane;
pub use nym_task::connections::{LaneQueueLengths, TransmissionLane};
pub use nym_topology::{provider_trait::TopologyProvider, NymTopology};
pub use paths::StoragePaths;
pub use socks5_client::Socks5MixnetClient;