Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d21cbc9746 | |||
| d4d739e267 | |||
| 433214a9d8 | |||
| fb74d243d5 | |||
| ee54efe54b |
Generated
+1
@@ -581,6 +581,7 @@ name = "client-connections"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"futures",
|
"futures",
|
||||||
|
"log",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|||||||
@@ -8,13 +8,15 @@
|
|||||||
use self::{
|
use self::{
|
||||||
acknowledgement_control::AcknowledgementController, real_traffic_stream::OutQueueControl,
|
acknowledgement_control::AcknowledgementController, real_traffic_stream::OutQueueControl,
|
||||||
};
|
};
|
||||||
use crate::client::real_messages_control::acknowledgement_control::AcknowledgementControllerConnectors;
|
use crate::{
|
||||||
use crate::client::{
|
client::{
|
||||||
inbound_messages::InputMessageReceiver, mix_traffic::BatchMixMessageSender,
|
inbound_messages::InputMessageReceiver, mix_traffic::BatchMixMessageSender,
|
||||||
topology_control::TopologyAccessor,
|
real_messages_control::acknowledgement_control::AcknowledgementControllerConnectors,
|
||||||
|
topology_control::TopologyAccessor,
|
||||||
|
},
|
||||||
|
spawn_future,
|
||||||
};
|
};
|
||||||
use crate::spawn_future;
|
use client_connections::{ClosedConnectionReceiver, LaneQueueLengths};
|
||||||
use client_connections::ClosedConnectionReceiver;
|
|
||||||
use futures::channel::mpsc;
|
use futures::channel::mpsc;
|
||||||
use gateway_client::AcknowledgementReceiver;
|
use gateway_client::AcknowledgementReceiver;
|
||||||
use log::*;
|
use log::*;
|
||||||
@@ -104,6 +106,7 @@ where
|
|||||||
// obviously when we finally make shared rng that is on 'higher' level, this should become
|
// obviously when we finally make shared rng that is on 'higher' level, this should become
|
||||||
// generic `R`
|
// generic `R`
|
||||||
impl RealMessagesController<OsRng> {
|
impl RealMessagesController<OsRng> {
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub fn new(
|
pub fn new(
|
||||||
config: Config,
|
config: Config,
|
||||||
ack_receiver: AcknowledgementReceiver,
|
ack_receiver: AcknowledgementReceiver,
|
||||||
@@ -111,6 +114,7 @@ impl RealMessagesController<OsRng> {
|
|||||||
mix_sender: BatchMixMessageSender,
|
mix_sender: BatchMixMessageSender,
|
||||||
topology_access: TopologyAccessor,
|
topology_access: TopologyAccessor,
|
||||||
#[cfg(feature = "reply-surb")] reply_key_storage: ReplyKeyStorage,
|
#[cfg(feature = "reply-surb")] reply_key_storage: ReplyKeyStorage,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
closed_connection_rx: ClosedConnectionReceiver,
|
closed_connection_rx: ClosedConnectionReceiver,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let rng = OsRng;
|
let rng = OsRng;
|
||||||
@@ -161,6 +165,7 @@ impl RealMessagesController<OsRng> {
|
|||||||
rng,
|
rng,
|
||||||
config.self_recipient,
|
config.self_recipient,
|
||||||
topology_access,
|
topology_access,
|
||||||
|
lane_queue_lengths,
|
||||||
closed_connection_rx,
|
closed_connection_rx,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -4,7 +4,9 @@
|
|||||||
use crate::client::mix_traffic::BatchMixMessageSender;
|
use crate::client::mix_traffic::BatchMixMessageSender;
|
||||||
use crate::client::real_messages_control::acknowledgement_control::SentPacketNotificationSender;
|
use crate::client::real_messages_control::acknowledgement_control::SentPacketNotificationSender;
|
||||||
use crate::client::topology_control::TopologyAccessor;
|
use crate::client::topology_control::TopologyAccessor;
|
||||||
use client_connections::{ClosedConnectionReceiver, ConnectionId, TransmissionLane};
|
use client_connections::{
|
||||||
|
ClosedConnectionReceiver, ConnectionId, LaneQueueLengths, TransmissionLane,
|
||||||
|
};
|
||||||
use futures::channel::mpsc;
|
use futures::channel::mpsc;
|
||||||
use futures::task::{Context, Poll};
|
use futures::task::{Context, Poll};
|
||||||
use futures::{Future, Stream, StreamExt};
|
use futures::{Future, Stream, StreamExt};
|
||||||
@@ -134,6 +136,9 @@ where
|
|||||||
/// Incoming channel for being notified of closed connections, so that we can close lanes
|
/// Incoming channel for being notified of closed connections, so that we can close lanes
|
||||||
/// corresponding to connections. To avoid sending traffic unnecessary
|
/// corresponding to connections. To avoid sending traffic unnecessary
|
||||||
closed_connection_rx: ClosedConnectionReceiver,
|
closed_connection_rx: ClosedConnectionReceiver,
|
||||||
|
|
||||||
|
/// Report queue lengths so that upstream can backoff sending data, and keep connections open.
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) struct RealMessage {
|
pub(crate) struct RealMessage {
|
||||||
@@ -177,6 +182,7 @@ where
|
|||||||
rng: R,
|
rng: R,
|
||||||
our_full_destination: Recipient,
|
our_full_destination: Recipient,
|
||||||
topology_access: TopologyAccessor,
|
topology_access: TopologyAccessor,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
closed_connection_rx: ClosedConnectionReceiver,
|
closed_connection_rx: ClosedConnectionReceiver,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
OutQueueControl {
|
OutQueueControl {
|
||||||
@@ -192,9 +198,14 @@ where
|
|||||||
topology_access,
|
topology_access,
|
||||||
transmission_buffer: Default::default(),
|
transmission_buffer: Default::default(),
|
||||||
closed_connection_rx,
|
closed_connection_rx,
|
||||||
|
lane_queue_lengths,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//pub fn get_lane_queue_length(&self) -> &LaneQueueLength {
|
||||||
|
// &self.lane_queue_length
|
||||||
|
//}
|
||||||
|
|
||||||
fn sent_notify(&self, frag_id: FragmentIdentifier) {
|
fn sent_notify(&self, frag_id: FragmentIdentifier) {
|
||||||
// well technically the message was not sent just yet, but now it's up to internal
|
// well technically the message was not sent just yet, but now it's up to internal
|
||||||
// queues and client load rather than the required delay. So realistically we can treat
|
// queues and client load rather than the required delay. So realistically we can treat
|
||||||
@@ -272,9 +283,9 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn on_close_connection(&mut self, connection_id: ConnectionId) {
|
fn on_close_connection(&mut self, connection_id: ConnectionId) {
|
||||||
log::debug!("Removing lane for connection: {connection_id}");
|
//log::debug!("Removing lane for connection: {connection_id}");
|
||||||
self.transmission_buffer
|
//self.transmission_buffer
|
||||||
.remove(&TransmissionLane::ConnectionId(connection_id));
|
//.remove(&TransmissionLane::ConnectionId(connection_id));
|
||||||
}
|
}
|
||||||
|
|
||||||
fn current_average_message_sending_delay(&self) -> Duration {
|
fn current_average_message_sending_delay(&self) -> Duration {
|
||||||
@@ -309,6 +320,17 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn pop_next_message(&mut self) -> Option<RealMessage> {
|
||||||
|
// Pop the next message from the transmission buffer
|
||||||
|
let (lane, real_next) = self.transmission_buffer.pop_next_message_at_random()?;
|
||||||
|
|
||||||
|
// Update the published queue length
|
||||||
|
let lane_length = self.transmission_buffer.lane_length(&lane);
|
||||||
|
self.lane_queue_lengths.set(&lane, lane_length);
|
||||||
|
|
||||||
|
Some(real_next)
|
||||||
|
}
|
||||||
|
|
||||||
fn poll_poisson(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
|
fn poll_poisson(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
|
||||||
// The average delay could change depending on if backpressure in the downstream channel
|
// The average delay could change depending on if backpressure in the downstream channel
|
||||||
// (mix_tx) was detected.
|
// (mix_tx) was detected.
|
||||||
@@ -359,16 +381,13 @@ where
|
|||||||
log::trace!("handling real_messages: size: {}", real_messages.len());
|
log::trace!("handling real_messages: size: {}", real_messages.len());
|
||||||
|
|
||||||
self.transmission_buffer.store(&conn_id, real_messages);
|
self.transmission_buffer.store(&conn_id, real_messages);
|
||||||
let real_next = self
|
let real_next = self.pop_next_message().expect("Just stored one");
|
||||||
.transmission_buffer
|
|
||||||
.pop_next_message_at_random()
|
|
||||||
.expect("we just added one");
|
|
||||||
|
|
||||||
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
|
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
|
||||||
}
|
}
|
||||||
|
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
if let Some(real_next) = self.transmission_buffer.pop_next_message_at_random() {
|
if let Some(real_next) = self.pop_next_message() {
|
||||||
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
|
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
|
||||||
} else {
|
} else {
|
||||||
// otherwise construct a dummy one
|
// otherwise construct a dummy one
|
||||||
@@ -411,16 +430,13 @@ where
|
|||||||
|
|
||||||
// First store what we got for the given connection id
|
// First store what we got for the given connection id
|
||||||
self.transmission_buffer.store(&conn_id, real_messages);
|
self.transmission_buffer.store(&conn_id, real_messages);
|
||||||
let real_next = self
|
let real_next = self.pop_next_message().expect("we just added one");
|
||||||
.transmission_buffer
|
|
||||||
.pop_next_message_at_random()
|
|
||||||
.expect("we just added one");
|
|
||||||
|
|
||||||
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
|
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
|
||||||
}
|
}
|
||||||
|
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
if let Some(real_next) = self.transmission_buffer.pop_next_message_at_random() {
|
if let Some(real_next) = self.pop_next_message() {
|
||||||
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
|
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
|
||||||
} else {
|
} else {
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
|
|||||||
+7
-2
@@ -41,6 +41,10 @@ impl TransmissionBuffer {
|
|||||||
self.buffer.keys().count()
|
self.buffer.keys().count()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn lane_length(&self, lane: &TransmissionLane) -> Option<usize> {
|
||||||
|
self.buffer.get(lane).map(LaneBufferEntry::len)
|
||||||
|
}
|
||||||
|
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
pub(crate) fn connections(&self) -> HashSet<u64> {
|
pub(crate) fn connections(&self) -> HashSet<u64> {
|
||||||
self.buffer
|
self.buffer
|
||||||
@@ -127,7 +131,7 @@ impl TransmissionBuffer {
|
|||||||
Some(real_next)
|
Some(real_next)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn pop_next_message_at_random(&mut self) -> Option<RealMessage> {
|
pub(crate) fn pop_next_message_at_random(&mut self) -> Option<(TransmissionLane, RealMessage)> {
|
||||||
if self.buffer.is_empty() {
|
if self.buffer.is_empty() {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
@@ -142,8 +146,9 @@ impl TransmissionBuffer {
|
|||||||
*self.pick_random_lane()?
|
*self.pick_random_lane()?
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let msg = self.pop_front_from_lane(&lane)?;
|
||||||
log::trace!("picking to send from lane: {:?}", lane);
|
log::trace!("picking to send from lane: {:?}", lane);
|
||||||
self.pop_front_from_lane(&lane)
|
Some((lane, msg))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn prune_stale_connections(&mut self) {
|
pub(crate) fn prune_stale_connections(&mut self) {
|
||||||
|
|||||||
@@ -1,7 +1,9 @@
|
|||||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||||
// SPDX-License-Identifier: Apache-2.0
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
use client_connections::{ClosedConnectionReceiver, ClosedConnectionSender, TransmissionLane};
|
use client_connections::{
|
||||||
|
ClosedConnectionReceiver, ClosedConnectionSender, LaneQueueLengths, TransmissionLane,
|
||||||
|
};
|
||||||
use client_core::client::cover_traffic_stream::LoopCoverTrafficStream;
|
use client_core::client::cover_traffic_stream::LoopCoverTrafficStream;
|
||||||
use client_core::client::inbound_messages::{
|
use client_core::client::inbound_messages::{
|
||||||
InputMessage, InputMessageReceiver, InputMessageSender,
|
InputMessage, InputMessageReceiver, InputMessageSender,
|
||||||
@@ -119,6 +121,7 @@ impl NymClient {
|
|||||||
ack_receiver: AcknowledgementReceiver,
|
ack_receiver: AcknowledgementReceiver,
|
||||||
input_receiver: InputMessageReceiver,
|
input_receiver: InputMessageReceiver,
|
||||||
mix_sender: BatchMixMessageSender,
|
mix_sender: BatchMixMessageSender,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
closed_connection_rx: ClosedConnectionReceiver,
|
closed_connection_rx: ClosedConnectionReceiver,
|
||||||
shutdown: ShutdownListener,
|
shutdown: ShutdownListener,
|
||||||
) {
|
) {
|
||||||
@@ -149,6 +152,7 @@ impl NymClient {
|
|||||||
mix_sender,
|
mix_sender,
|
||||||
topology_accessor,
|
topology_accessor,
|
||||||
reply_key_storage,
|
reply_key_storage,
|
||||||
|
lane_queue_lengths,
|
||||||
closed_connection_rx,
|
closed_connection_rx,
|
||||||
)
|
)
|
||||||
.start_with_shutdown(shutdown);
|
.start_with_shutdown(shutdown);
|
||||||
@@ -283,6 +287,7 @@ impl NymClient {
|
|||||||
&self,
|
&self,
|
||||||
buffer_requester: ReceivedBufferRequestSender,
|
buffer_requester: ReceivedBufferRequestSender,
|
||||||
msg_input: InputMessageSender,
|
msg_input: InputMessageSender,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
closed_connection_tx: ClosedConnectionSender,
|
closed_connection_tx: ClosedConnectionSender,
|
||||||
) {
|
) {
|
||||||
info!("Starting websocket listener...");
|
info!("Starting websocket listener...");
|
||||||
@@ -292,6 +297,7 @@ impl NymClient {
|
|||||||
closed_connection_tx,
|
closed_connection_tx,
|
||||||
buffer_requester,
|
buffer_requester,
|
||||||
&self.as_mix_recipient(),
|
&self.as_mix_recipient(),
|
||||||
|
lane_queue_lengths,
|
||||||
);
|
);
|
||||||
|
|
||||||
websocket::Listener::new(self.config.get_listening_port()).start(websocket_handler);
|
websocket::Listener::new(self.config.get_listening_port()).start(websocket_handler);
|
||||||
@@ -417,12 +423,17 @@ impl NymClient {
|
|||||||
// controller that connections are closed.
|
// controller that connections are closed.
|
||||||
let (closed_connection_tx, closed_connection_rx) = mpsc::unbounded();
|
let (closed_connection_tx, closed_connection_rx) = mpsc::unbounded();
|
||||||
|
|
||||||
|
// Shared queue length data. Published by the `OutQueueController` in the client, and used
|
||||||
|
// primarily to throttle incoming connections (e.g socks5 for attached network-requesters)
|
||||||
|
let shared_lane_queue_lengths = LaneQueueLengths::new();
|
||||||
|
|
||||||
self.start_real_traffic_controller(
|
self.start_real_traffic_controller(
|
||||||
shared_topology_accessor.clone(),
|
shared_topology_accessor.clone(),
|
||||||
reply_key_storage,
|
reply_key_storage,
|
||||||
ack_receiver,
|
ack_receiver,
|
||||||
input_receiver,
|
input_receiver,
|
||||||
sphinx_message_sender.clone(),
|
sphinx_message_sender.clone(),
|
||||||
|
shared_lane_queue_lengths.clone(),
|
||||||
closed_connection_rx,
|
closed_connection_rx,
|
||||||
shutdown.subscribe(),
|
shutdown.subscribe(),
|
||||||
);
|
);
|
||||||
@@ -443,6 +454,7 @@ impl NymClient {
|
|||||||
SocketType::WebSocket => self.start_websocket_listener(
|
SocketType::WebSocket => self.start_websocket_listener(
|
||||||
received_buffer_request_sender,
|
received_buffer_request_sender,
|
||||||
input_sender,
|
input_sender,
|
||||||
|
shared_lane_queue_lengths,
|
||||||
closed_connection_tx,
|
closed_connection_tx,
|
||||||
),
|
),
|
||||||
SocketType::None => {
|
SocketType::None => {
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||||
// SPDX-License-Identifier: Apache-2.0
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
use client_connections::{ClosedConnectionSender, TransmissionLane};
|
use client_connections::{ClosedConnectionSender, LaneQueueLengths, TransmissionLane};
|
||||||
use client_core::client::{
|
use client_core::client::{
|
||||||
inbound_messages::{InputMessage, InputMessageSender},
|
inbound_messages::{InputMessage, InputMessageSender},
|
||||||
received_buffer::{
|
received_buffer::{
|
||||||
@@ -40,6 +40,7 @@ pub(crate) struct Handler {
|
|||||||
self_full_address: Recipient,
|
self_full_address: Recipient,
|
||||||
socket: Option<WebSocketStream<TcpStream>>,
|
socket: Option<WebSocketStream<TcpStream>>,
|
||||||
received_response_type: ReceivedResponseType,
|
received_response_type: ReceivedResponseType,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
}
|
}
|
||||||
|
|
||||||
// clone is used to use handler on a new connection, which initially is `None`
|
// clone is used to use handler on a new connection, which initially is `None`
|
||||||
@@ -52,6 +53,7 @@ impl Clone for Handler {
|
|||||||
self_full_address: self.self_full_address,
|
self_full_address: self.self_full_address,
|
||||||
socket: None,
|
socket: None,
|
||||||
received_response_type: Default::default(),
|
received_response_type: Default::default(),
|
||||||
|
lane_queue_lengths: self.lane_queue_lengths.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -70,6 +72,7 @@ impl Handler {
|
|||||||
closed_connection_tx: ClosedConnectionSender,
|
closed_connection_tx: ClosedConnectionSender,
|
||||||
buffer_requester: ReceivedBufferRequestSender,
|
buffer_requester: ReceivedBufferRequestSender,
|
||||||
self_full_address: &Recipient,
|
self_full_address: &Recipient,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Handler {
|
Handler {
|
||||||
msg_input,
|
msg_input,
|
||||||
@@ -78,6 +81,7 @@ impl Handler {
|
|||||||
self_full_address: *self_full_address,
|
self_full_address: *self_full_address,
|
||||||
socket: None,
|
socket: None,
|
||||||
received_response_type: Default::default(),
|
received_response_type: Default::default(),
|
||||||
|
lane_queue_lengths,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -93,7 +97,14 @@ impl Handler {
|
|||||||
let input_msg = InputMessage::new_fresh(*recipient, message, with_reply_surb, lane);
|
let input_msg = InputMessage::new_fresh(*recipient, message, with_reply_surb, lane);
|
||||||
self.msg_input.unbounded_send(input_msg).unwrap();
|
self.msg_input.unbounded_send(input_msg).unwrap();
|
||||||
|
|
||||||
None
|
// WIP(JON)
|
||||||
|
let queue_lengh = self
|
||||||
|
.lane_queue_lengths
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.get(&lane)
|
||||||
|
.unwrap_or(0);
|
||||||
|
Some(ServerResponse::LaneQueueLength(connection_id, queue_lengh))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_reply(&mut self, reply_surb: ReplySurb, message: Vec<u8>) -> Option<ServerResponse> {
|
fn handle_reply(&mut self, reply_surb: ReplySurb, message: Vec<u8>) -> Option<ServerResponse> {
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ use crate::socks::{
|
|||||||
authentication::{AuthenticationMethods, Authenticator, User},
|
authentication::{AuthenticationMethods, Authenticator, User},
|
||||||
server::SphinxSocksServer,
|
server::SphinxSocksServer,
|
||||||
};
|
};
|
||||||
use client_connections::{ClosedConnectionReceiver, ClosedConnectionSender};
|
use client_connections::{ClosedConnectionReceiver, ClosedConnectionSender, LaneQueueLengths};
|
||||||
use client_core::client::cover_traffic_stream::LoopCoverTrafficStream;
|
use client_core::client::cover_traffic_stream::LoopCoverTrafficStream;
|
||||||
use client_core::client::inbound_messages::{
|
use client_core::client::inbound_messages::{
|
||||||
InputMessage, InputMessageReceiver, InputMessageSender,
|
InputMessage, InputMessageReceiver, InputMessageSender,
|
||||||
@@ -120,6 +120,7 @@ impl NymClient {
|
|||||||
input_receiver: InputMessageReceiver,
|
input_receiver: InputMessageReceiver,
|
||||||
mix_sender: BatchMixMessageSender,
|
mix_sender: BatchMixMessageSender,
|
||||||
closed_connection_rx: ClosedConnectionReceiver,
|
closed_connection_rx: ClosedConnectionReceiver,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
shutdown: ShutdownListener,
|
shutdown: ShutdownListener,
|
||||||
) {
|
) {
|
||||||
let mut controller_config = client_core::client::real_messages_control::Config::new(
|
let mut controller_config = client_core::client::real_messages_control::Config::new(
|
||||||
@@ -149,6 +150,7 @@ impl NymClient {
|
|||||||
mix_sender,
|
mix_sender,
|
||||||
topology_accessor,
|
topology_accessor,
|
||||||
reply_key_storage,
|
reply_key_storage,
|
||||||
|
lane_queue_lengths,
|
||||||
closed_connection_rx,
|
closed_connection_rx,
|
||||||
)
|
)
|
||||||
.start_with_shutdown(shutdown);
|
.start_with_shutdown(shutdown);
|
||||||
@@ -284,6 +286,7 @@ impl NymClient {
|
|||||||
buffer_requester: ReceivedBufferRequestSender,
|
buffer_requester: ReceivedBufferRequestSender,
|
||||||
msg_input: InputMessageSender,
|
msg_input: InputMessageSender,
|
||||||
closed_connection_tx: ClosedConnectionSender,
|
closed_connection_tx: ClosedConnectionSender,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
shutdown: ShutdownListener,
|
shutdown: ShutdownListener,
|
||||||
) {
|
) {
|
||||||
info!("Starting socks5 listener...");
|
info!("Starting socks5 listener...");
|
||||||
@@ -296,6 +299,7 @@ impl NymClient {
|
|||||||
authenticator,
|
authenticator,
|
||||||
self.config.get_provider_mix_address(),
|
self.config.get_provider_mix_address(),
|
||||||
self.as_mix_recipient(),
|
self.as_mix_recipient(),
|
||||||
|
lane_queue_lengths,
|
||||||
shutdown,
|
shutdown,
|
||||||
);
|
);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
@@ -409,6 +413,10 @@ impl NymClient {
|
|||||||
// This will be forwarded to `OutQueueControl`
|
// This will be forwarded to `OutQueueControl`
|
||||||
let (closed_connection_tx, closed_connection_rx) = mpsc::unbounded();
|
let (closed_connection_tx, closed_connection_rx) = mpsc::unbounded();
|
||||||
|
|
||||||
|
// Shared queue length data. Published by the `OutQueueController` in the client, and used
|
||||||
|
// primarily to throttle incoming connections
|
||||||
|
let shared_lane_queue_lengths = LaneQueueLengths::new();
|
||||||
|
|
||||||
self.start_real_traffic_controller(
|
self.start_real_traffic_controller(
|
||||||
shared_topology_accessor.clone(),
|
shared_topology_accessor.clone(),
|
||||||
reply_key_storage,
|
reply_key_storage,
|
||||||
@@ -416,6 +424,7 @@ impl NymClient {
|
|||||||
input_receiver,
|
input_receiver,
|
||||||
sphinx_message_sender.clone(),
|
sphinx_message_sender.clone(),
|
||||||
closed_connection_rx,
|
closed_connection_rx,
|
||||||
|
shared_lane_queue_lengths.clone(),
|
||||||
shutdown.subscribe(),
|
shutdown.subscribe(),
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -435,6 +444,7 @@ impl NymClient {
|
|||||||
received_buffer_request_sender,
|
received_buffer_request_sender,
|
||||||
input_sender,
|
input_sender,
|
||||||
closed_connection_tx,
|
closed_connection_tx,
|
||||||
|
shared_lane_queue_lengths,
|
||||||
shutdown.subscribe(),
|
shutdown.subscribe(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ use super::authentication::{AuthenticationMethods, Authenticator, User};
|
|||||||
use super::request::{SocksCommand, SocksRequest};
|
use super::request::{SocksCommand, SocksRequest};
|
||||||
use super::types::{ResponseCode, SocksProxyError};
|
use super::types::{ResponseCode, SocksProxyError};
|
||||||
use super::{RESERVED, SOCKS_VERSION};
|
use super::{RESERVED, SOCKS_VERSION};
|
||||||
use client_connections::TransmissionLane;
|
use client_connections::{LaneQueueLengths, TransmissionLane};
|
||||||
use client_core::client::inbound_messages::{InputMessage, InputMessageSender};
|
use client_core::client::inbound_messages::{InputMessage, InputMessageSender};
|
||||||
use futures::channel::mpsc;
|
use futures::channel::mpsc;
|
||||||
use futures::task::{Context, Poll};
|
use futures::task::{Context, Poll};
|
||||||
@@ -20,6 +20,7 @@ use socks5_requests::{ConnectionId, Message, RemoteAddress, Request};
|
|||||||
use std::io;
|
use std::io;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
|
use std::sync::Arc;
|
||||||
use task::ShutdownListener;
|
use task::ShutdownListener;
|
||||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
|
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
|
||||||
use tokio::{self, net::TcpStream};
|
use tokio::{self, net::TcpStream};
|
||||||
@@ -141,7 +142,9 @@ pub(crate) struct SocksClient {
|
|||||||
service_provider: Recipient,
|
service_provider: Recipient,
|
||||||
self_address: Recipient,
|
self_address: Recipient,
|
||||||
started_proxy: bool,
|
started_proxy: bool,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
shutdown_listener: ShutdownListener,
|
shutdown_listener: ShutdownListener,
|
||||||
|
active_connections: Arc<std::sync::Mutex<u64>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for SocksClient {
|
impl Drop for SocksClient {
|
||||||
@@ -165,9 +168,17 @@ impl SocksClient {
|
|||||||
service_provider: Recipient,
|
service_provider: Recipient,
|
||||||
controller_sender: ControllerSender,
|
controller_sender: ControllerSender,
|
||||||
self_address: Recipient,
|
self_address: Recipient,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
shutdown_listener: ShutdownListener,
|
shutdown_listener: ShutdownListener,
|
||||||
|
active_connections: Arc<std::sync::Mutex<u64>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let connection_id = Self::generate_random();
|
let connection_id = Self::generate_random();
|
||||||
|
let u_active_connections = {
|
||||||
|
let g = active_connections.lock().unwrap();
|
||||||
|
*g
|
||||||
|
};
|
||||||
|
error!("client active_connections: {}", u_active_connections);
|
||||||
|
|
||||||
SocksClient {
|
SocksClient {
|
||||||
controller_sender,
|
controller_sender,
|
||||||
connection_id,
|
connection_id,
|
||||||
@@ -179,7 +190,9 @@ impl SocksClient {
|
|||||||
service_provider,
|
service_provider,
|
||||||
self_address,
|
self_address,
|
||||||
started_proxy: false,
|
started_proxy: false,
|
||||||
|
lane_queue_lengths,
|
||||||
shutdown_listener,
|
shutdown_listener,
|
||||||
|
active_connections,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -205,6 +218,8 @@ impl SocksClient {
|
|||||||
/// is in use and that the client is authenticated, then runs the request.
|
/// is in use and that the client is authenticated, then runs the request.
|
||||||
pub async fn run(&mut self) -> Result<(), SocksProxyError> {
|
pub async fn run(&mut self) -> Result<(), SocksProxyError> {
|
||||||
debug!("New connection from: {}", self.stream.peer_addr()?.ip());
|
debug!("New connection from: {}", self.stream.peer_addr()?.ip());
|
||||||
|
//dbg!(&self.stream.peer_addr());
|
||||||
|
|
||||||
let mut header = [0u8; 2];
|
let mut header = [0u8; 2];
|
||||||
// Read a byte from the stream and determine the version being requested
|
// Read a byte from the stream and determine the version being requested
|
||||||
self.stream.read_exact(&mut header).await?;
|
self.stream.read_exact(&mut header).await?;
|
||||||
@@ -258,6 +273,7 @@ impl SocksClient {
|
|||||||
conn_receiver,
|
conn_receiver,
|
||||||
input_sender,
|
input_sender,
|
||||||
connection_id,
|
connection_id,
|
||||||
|
self.lane_queue_lengths.clone(),
|
||||||
self.shutdown_listener.clone(),
|
self.shutdown_listener.clone(),
|
||||||
)
|
)
|
||||||
.run(move |conn_id, read_data, socket_closed| {
|
.run(move |conn_id, read_data, socket_closed| {
|
||||||
@@ -275,10 +291,24 @@ impl SocksClient {
|
|||||||
/// Handles a client request.
|
/// Handles a client request.
|
||||||
async fn handle_request(&mut self) -> Result<(), SocksProxyError> {
|
async fn handle_request(&mut self) -> Result<(), SocksProxyError> {
|
||||||
debug!("Handling CONNECT Command");
|
debug!("Handling CONNECT Command");
|
||||||
|
let active_connections = {
|
||||||
|
let g = self.active_connections.lock().unwrap();
|
||||||
|
*g
|
||||||
|
};
|
||||||
|
error!("active_connections: {}", active_connections);
|
||||||
|
|
||||||
let request = SocksRequest::from_stream(&mut self.stream).await?;
|
let request = SocksRequest::from_stream(&mut self.stream).await?;
|
||||||
let remote_address = request.to_string();
|
let remote_address = request.to_string();
|
||||||
|
|
||||||
|
if active_connections > 50 {
|
||||||
|
log::warn!(
|
||||||
|
"Refusing SOCKS5: too many connections: {}",
|
||||||
|
active_connections
|
||||||
|
);
|
||||||
|
self.refuse_connection_socks5().await;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
// setup for receiving from the mixnet
|
// setup for receiving from the mixnet
|
||||||
let (mix_sender, mix_receiver) = mpsc::unbounded();
|
let (mix_sender, mix_receiver) = mpsc::unbounded();
|
||||||
|
|
||||||
@@ -332,6 +362,24 @@ impl SocksClient {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn refuse_connection_socks5(&mut self) {
|
||||||
|
self.stream
|
||||||
|
.write_all(&[
|
||||||
|
SOCKS_VERSION,
|
||||||
|
ResponseCode::RuleFailure as u8,
|
||||||
|
RESERVED,
|
||||||
|
1,
|
||||||
|
127,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
1,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
])
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
/// Authenticate the incoming request. Each request is checked for its
|
/// Authenticate the incoming request. Each request is checked for its
|
||||||
/// authentication method. A user/password request will extract the
|
/// authentication method. A user/password request will extract the
|
||||||
/// username and password from the stream, then check with the Authenticator
|
/// username and password from the stream, then check with the Authenticator
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ use super::{
|
|||||||
mixnet_responses::MixnetResponseListener,
|
mixnet_responses::MixnetResponseListener,
|
||||||
types::{ResponseCode, SocksProxyError},
|
types::{ResponseCode, SocksProxyError},
|
||||||
};
|
};
|
||||||
use client_connections::ClosedConnectionSender;
|
use client_connections::{ClosedConnectionSender, LaneQueueLengths};
|
||||||
use client_core::client::{
|
use client_core::client::{
|
||||||
inbound_messages::InputMessageSender, received_buffer::ReceivedBufferRequestSender,
|
inbound_messages::InputMessageSender, received_buffer::ReceivedBufferRequestSender,
|
||||||
};
|
};
|
||||||
@@ -12,6 +12,7 @@ use log::*;
|
|||||||
use nymsphinx::addressing::clients::Recipient;
|
use nymsphinx::addressing::clients::Recipient;
|
||||||
use proxy_helpers::connection_controller::Controller;
|
use proxy_helpers::connection_controller::Controller;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
use std::sync::Arc;
|
||||||
use task::ShutdownListener;
|
use task::ShutdownListener;
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
|
|
||||||
@@ -21,6 +22,7 @@ pub struct SphinxSocksServer {
|
|||||||
listening_address: SocketAddr,
|
listening_address: SocketAddr,
|
||||||
service_provider: Recipient,
|
service_provider: Recipient,
|
||||||
self_address: Recipient,
|
self_address: Recipient,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
shutdown: ShutdownListener,
|
shutdown: ShutdownListener,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -31,6 +33,7 @@ impl SphinxSocksServer {
|
|||||||
authenticator: Authenticator,
|
authenticator: Authenticator,
|
||||||
service_provider: Recipient,
|
service_provider: Recipient,
|
||||||
self_address: Recipient,
|
self_address: Recipient,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
shutdown: ShutdownListener,
|
shutdown: ShutdownListener,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
// hardcode ip as we (presumably) ONLY want to listen locally. If we change it, we can
|
// hardcode ip as we (presumably) ONLY want to listen locally. If we change it, we can
|
||||||
@@ -42,6 +45,7 @@ impl SphinxSocksServer {
|
|||||||
listening_address: format!("{}:{}", ip, port).parse().unwrap(),
|
listening_address: format!("{}:{}", ip, port).parse().unwrap(),
|
||||||
service_provider,
|
service_provider,
|
||||||
self_address,
|
self_address,
|
||||||
|
lane_queue_lengths,
|
||||||
shutdown,
|
shutdown,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -74,10 +78,16 @@ impl SphinxSocksServer {
|
|||||||
mixnet_response_listener.run().await;
|
mixnet_response_listener.run().await;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let active_connections = Arc::new(std::sync::Mutex::new(0));
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
Ok((stream, _remote)) = listener.accept() => {
|
Ok((stream, _remote)) = listener.accept() => {
|
||||||
// TODO Optimize this
|
// TODO Optimize this
|
||||||
|
{
|
||||||
|
let mut g = active_connections.lock().unwrap();
|
||||||
|
*g += 1;
|
||||||
|
}
|
||||||
let mut client = SocksClient::new(
|
let mut client = SocksClient::new(
|
||||||
stream,
|
stream,
|
||||||
self.authenticator.clone(),
|
self.authenticator.clone(),
|
||||||
@@ -85,9 +95,12 @@ impl SphinxSocksServer {
|
|||||||
self.service_provider,
|
self.service_provider,
|
||||||
controller_sender.clone(),
|
controller_sender.clone(),
|
||||||
self.self_address,
|
self.self_address,
|
||||||
|
self.lane_queue_lengths.clone(),
|
||||||
self.shutdown.clone(),
|
self.shutdown.clone(),
|
||||||
|
active_connections.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let active_connections = active_connections.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
{
|
{
|
||||||
match client.run().await {
|
match client.run().await {
|
||||||
@@ -118,6 +131,10 @@ impl SphinxSocksServer {
|
|||||||
};
|
};
|
||||||
// client gets dropped here
|
// client gets dropped here
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
let mut g = active_connections.lock().unwrap();
|
||||||
|
*g -= 1;
|
||||||
|
}
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
_ = self.shutdown.recv() => {
|
_ = self.shutdown.recv() => {
|
||||||
|
|||||||
@@ -7,3 +7,4 @@ edition = "2021"
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
|
log = "0.4.17"
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||||
// SPDX-License-Identifier: Apache-2.0
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use futures::channel::mpsc;
|
use futures::channel::mpsc;
|
||||||
|
|
||||||
pub type ConnectionId = u64;
|
pub type ConnectionId = u64;
|
||||||
@@ -19,3 +21,81 @@ pub enum TransmissionLane {
|
|||||||
/// they can forward this to the `OutQueueControl` (via `ClientRequest` for the network-requester)
|
/// they can forward this to the `OutQueueControl` (via `ClientRequest` for the network-requester)
|
||||||
pub type ClosedConnectionSender = mpsc::UnboundedSender<ConnectionId>;
|
pub type ClosedConnectionSender = mpsc::UnboundedSender<ConnectionId>;
|
||||||
pub type ClosedConnectionReceiver = mpsc::UnboundedReceiver<ConnectionId>;
|
pub type ClosedConnectionReceiver = mpsc::UnboundedReceiver<ConnectionId>;
|
||||||
|
|
||||||
|
// The `OutQueueControl` publishes the backlog per lane, primarily so that upstream can slow down
|
||||||
|
// if needed.
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct LaneQueueLengths(std::sync::Arc<std::sync::Mutex<LaneQueueLengthsInner>>);
|
||||||
|
|
||||||
|
impl LaneQueueLengths {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
LaneQueueLengths(std::sync::Arc::new(std::sync::Mutex::new(
|
||||||
|
LaneQueueLengthsInner {
|
||||||
|
map: HashMap::new(),
|
||||||
|
},
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set(&mut self, lane: &TransmissionLane, lane_length: Option<usize>) {
|
||||||
|
match self.0.lock() {
|
||||||
|
Ok(mut inner) => {
|
||||||
|
if let Some(length) = lane_length {
|
||||||
|
inner
|
||||||
|
.map
|
||||||
|
.entry(*lane)
|
||||||
|
.and_modify(|e| *e = length)
|
||||||
|
.or_insert(length);
|
||||||
|
} else {
|
||||||
|
inner.map.remove(lane);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => log::warn!("Failed to set lane queue length: {err}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get(&self, lane: &TransmissionLane) -> Option<usize> {
|
||||||
|
match self.0.lock() {
|
||||||
|
Ok(inner) => inner.get(lane),
|
||||||
|
Err(err) => {
|
||||||
|
log::warn!("Failed to get lane queue length: {err}");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for LaneQueueLengths {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::ops::Deref for LaneQueueLengths {
|
||||||
|
type Target = std::sync::Arc<std::sync::Mutex<LaneQueueLengthsInner>>;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct LaneQueueLengthsInner {
|
||||||
|
pub map: HashMap<TransmissionLane, usize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LaneQueueLengthsInner {
|
||||||
|
pub fn get(&self, lane: &TransmissionLane) -> Option<usize> {
|
||||||
|
self.map.get(lane).copied()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn values(&self) -> impl Iterator<Item = &usize> {
|
||||||
|
self.map.values()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn modify<F>(&mut self, lane: &TransmissionLane, f: F)
|
||||||
|
where
|
||||||
|
F: FnOnce(&mut usize),
|
||||||
|
{
|
||||||
|
self.map.entry(*lane).and_modify(f);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -5,11 +5,14 @@ use super::MixProxySender;
|
|||||||
use super::SHUTDOWN_TIMEOUT;
|
use super::SHUTDOWN_TIMEOUT;
|
||||||
use crate::available_reader::AvailableReader;
|
use crate::available_reader::AvailableReader;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
|
use client_connections::LaneQueueLengths;
|
||||||
|
use client_connections::TransmissionLane;
|
||||||
use futures::FutureExt;
|
use futures::FutureExt;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use log::*;
|
use log::*;
|
||||||
use ordered_buffer::OrderedMessageSender;
|
use ordered_buffer::OrderedMessageSender;
|
||||||
use socks5_requests::ConnectionId;
|
use socks5_requests::ConnectionId;
|
||||||
|
use std::time::Duration;
|
||||||
use std::{io, sync::Arc};
|
use std::{io, sync::Arc};
|
||||||
use task::ShutdownListener;
|
use task::ShutdownListener;
|
||||||
use tokio::select;
|
use tokio::select;
|
||||||
@@ -29,7 +32,7 @@ fn send_empty_close<F, S>(
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
fn deal_with_data<F, S>(
|
async fn deal_with_data<F, S>(
|
||||||
read_data: Option<io::Result<Bytes>>,
|
read_data: Option<io::Result<Bytes>>,
|
||||||
local_destination_address: &str,
|
local_destination_address: &str,
|
||||||
remote_source_address: &str,
|
remote_source_address: &str,
|
||||||
@@ -37,6 +40,7 @@ fn deal_with_data<F, S>(
|
|||||||
message_sender: &mut OrderedMessageSender,
|
message_sender: &mut OrderedMessageSender,
|
||||||
mix_sender: &MixProxySender<S>,
|
mix_sender: &MixProxySender<S>,
|
||||||
adapter_fn: F,
|
adapter_fn: F,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
) -> bool
|
) -> bool
|
||||||
where
|
where
|
||||||
F: Fn(ConnectionId, Vec<u8>, bool) -> S,
|
F: Fn(ConnectionId, Vec<u8>, bool) -> S,
|
||||||
@@ -67,6 +71,70 @@ where
|
|||||||
"pushing data down the input sender: size: {}",
|
"pushing data down the input sender: size: {}",
|
||||||
ordered_msg.len()
|
ordered_msg.len()
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// wait here until queue is not too long
|
||||||
|
let lane = TransmissionLane::ConnectionId(connection_id);
|
||||||
|
//loop {
|
||||||
|
{
|
||||||
|
let (queue_length, est_busy_conn) = {
|
||||||
|
let mut guard = lane_queue_lengths.lock().unwrap();
|
||||||
|
//let queue_length = *guard.get(&lane).unwrap_or(&0);
|
||||||
|
//let queue_length = *guard.entry(lane).or_insert(0);
|
||||||
|
let queue_length = guard.get(&lane).unwrap_or(0);
|
||||||
|
let est_busy_conn = guard.values().filter(|v| *v > &5).count();
|
||||||
|
|
||||||
|
// We estimate the queue length for subsequent data packet. This is needed
|
||||||
|
// because there is a delay until we get the correct value back as a server
|
||||||
|
// response. (And that will have a delay baked in).
|
||||||
|
// WIP(JON): pull packet size from somewhere
|
||||||
|
let sphinx_size = 2000.0;
|
||||||
|
let msg_length = (ordered_msg.len() as f64 / sphinx_size).ceil() as usize;
|
||||||
|
guard.modify(&lane, |length| *length += msg_length);
|
||||||
|
|
||||||
|
(queue_length, est_busy_conn)
|
||||||
|
};
|
||||||
|
|
||||||
|
log::info!("conn_id: {connection_id}, queue: {queue_length}");
|
||||||
|
// The heuristic here is:
|
||||||
|
// 20ms average delay => 50 packets / sec
|
||||||
|
// 500 packet queue => 10 sec behind
|
||||||
|
// This assumes it's the only active connection, and that there is no throttling
|
||||||
|
// In practive, this is a latency vs throughput tradeoff we're making here
|
||||||
|
let avererage_delay = 0.02; // TODO: read from config
|
||||||
|
let packets_per_sec = 1.0 / avererage_delay;
|
||||||
|
let ideal_time_to_clear_queue = queue_length as f64 / packets_per_sec;
|
||||||
|
if queue_length > 5000 {
|
||||||
|
log::info!("sleeping long");
|
||||||
|
sleep(Duration::from_secs_f64(ideal_time_to_clear_queue * 5.0)).await;
|
||||||
|
} else if queue_length > 500 {
|
||||||
|
log::info!("sleeping medium");
|
||||||
|
sleep(Duration::from_secs_f64(
|
||||||
|
ideal_time_to_clear_queue * 2.0 / 3.0,
|
||||||
|
))
|
||||||
|
.await;
|
||||||
|
} else if queue_length > 5 {
|
||||||
|
log::info!("sleeping short");
|
||||||
|
sleep(Duration::from_secs_f64(ideal_time_to_clear_queue / 3.0)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we are saturated on number of connections, and this is already a busy
|
||||||
|
// connection, basically soft-stop
|
||||||
|
//if est_busy_conn > 15 && queue_length > 5 {
|
||||||
|
// log::info!("soft-stop: {connection_id}");
|
||||||
|
// sleep(Duration::from_secs_f64(5.0 * 60.0)).await;
|
||||||
|
//}
|
||||||
|
|
||||||
|
//loop {
|
||||||
|
// let count = ACTIVE_PROXIES.load(Ordering::Relaxed);
|
||||||
|
// if count + 1 > 15 {
|
||||||
|
// log::info!("Max connections reached, parking: {conn_id}");
|
||||||
|
// sleep(Duration::from_secs(10)).await;
|
||||||
|
// } else {
|
||||||
|
// break;
|
||||||
|
// }
|
||||||
|
//}
|
||||||
|
}
|
||||||
|
|
||||||
mix_sender
|
mix_sender
|
||||||
.unbounded_send(adapter_fn(connection_id, ordered_msg, is_finished))
|
.unbounded_send(adapter_fn(connection_id, ordered_msg, is_finished))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@@ -88,6 +156,7 @@ pub(super) async fn run_inbound<F, S>(
|
|||||||
mix_sender: MixProxySender<S>,
|
mix_sender: MixProxySender<S>,
|
||||||
adapter_fn: F,
|
adapter_fn: F,
|
||||||
shutdown_notify: Arc<Notify>,
|
shutdown_notify: Arc<Notify>,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
mut shutdown_listener: ShutdownListener,
|
mut shutdown_listener: ShutdownListener,
|
||||||
) -> OwnedReadHalf
|
) -> OwnedReadHalf
|
||||||
where
|
where
|
||||||
@@ -102,12 +171,24 @@ where
|
|||||||
loop {
|
loop {
|
||||||
select! {
|
select! {
|
||||||
read_data = &mut available_reader.next() => {
|
read_data = &mut available_reader.next() => {
|
||||||
if deal_with_data(read_data, &local_destination_address, &remote_source_address, connection_id, &mut message_sender, &mix_sender, &adapter_fn) {
|
if deal_with_data(
|
||||||
|
read_data,
|
||||||
|
&local_destination_address,
|
||||||
|
&remote_source_address,
|
||||||
|
connection_id,
|
||||||
|
&mut message_sender,
|
||||||
|
&mix_sender,
|
||||||
|
&adapter_fn,
|
||||||
|
lane_queue_lengths.clone()
|
||||||
|
).await {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ = &mut shutdown_future => {
|
_ = &mut shutdown_future => {
|
||||||
debug!("closing inbound proxy after outbound was closed {:?} ago", SHUTDOWN_TIMEOUT);
|
debug!(
|
||||||
|
"closing inbound proxy after outbound was closed {:?} ago",
|
||||||
|
SHUTDOWN_TIMEOUT
|
||||||
|
);
|
||||||
// inform remote just in case it was closed because of lack of heartbeat.
|
// inform remote just in case it was closed because of lack of heartbeat.
|
||||||
// worst case the remote will just have couple of false negatives
|
// worst case the remote will just have couple of false negatives
|
||||||
send_empty_close(connection_id, &mut message_sender, &mix_sender, &adapter_fn);
|
send_empty_close(connection_id, &mut message_sender, &mix_sender, &adapter_fn);
|
||||||
|
|||||||
@@ -2,6 +2,7 @@
|
|||||||
// SPDX-License-Identifier: Apache-2.0
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
use crate::connection_controller::ConnectionReceiver;
|
use crate::connection_controller::ConnectionReceiver;
|
||||||
|
use client_connections::LaneQueueLengths;
|
||||||
use futures::channel::mpsc;
|
use futures::channel::mpsc;
|
||||||
use socks5_requests::ConnectionId;
|
use socks5_requests::ConnectionId;
|
||||||
use std::{sync::Arc, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
@@ -12,7 +13,7 @@ mod inbound;
|
|||||||
mod outbound;
|
mod outbound;
|
||||||
|
|
||||||
// TODO: make this configurable
|
// TODO: make this configurable
|
||||||
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30);
|
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(300);
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct ProxyMessage {
|
pub struct ProxyMessage {
|
||||||
@@ -45,6 +46,7 @@ pub struct ProxyRunner<S> {
|
|||||||
local_destination_address: String,
|
local_destination_address: String,
|
||||||
remote_source_address: String,
|
remote_source_address: String,
|
||||||
connection_id: ConnectionId,
|
connection_id: ConnectionId,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
|
|
||||||
// Listens to shutdown commands from higher up
|
// Listens to shutdown commands from higher up
|
||||||
shutdown_listener: ShutdownListener,
|
shutdown_listener: ShutdownListener,
|
||||||
@@ -61,6 +63,7 @@ where
|
|||||||
mix_receiver: ConnectionReceiver,
|
mix_receiver: ConnectionReceiver,
|
||||||
mix_sender: MixProxySender<S>,
|
mix_sender: MixProxySender<S>,
|
||||||
connection_id: ConnectionId,
|
connection_id: ConnectionId,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
shutdown_listener: ShutdownListener,
|
shutdown_listener: ShutdownListener,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
ProxyRunner {
|
ProxyRunner {
|
||||||
@@ -70,6 +73,7 @@ where
|
|||||||
local_destination_address,
|
local_destination_address,
|
||||||
remote_source_address,
|
remote_source_address,
|
||||||
connection_id,
|
connection_id,
|
||||||
|
lane_queue_lengths,
|
||||||
shutdown_listener,
|
shutdown_listener,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -78,7 +82,7 @@ where
|
|||||||
// request/response as required by entity running particular side of the proxy.
|
// request/response as required by entity running particular side of the proxy.
|
||||||
pub async fn run<F>(mut self, adapter_fn: F) -> Self
|
pub async fn run<F>(mut self, adapter_fn: F) -> Self
|
||||||
where
|
where
|
||||||
F: Fn(ConnectionId, Vec<u8>, bool) -> S + Send + 'static,
|
F: Fn(ConnectionId, Vec<u8>, bool) -> S + Send + Sync + 'static,
|
||||||
{
|
{
|
||||||
let (read_half, write_half) = self.socket.take().unwrap().into_split();
|
let (read_half, write_half) = self.socket.take().unwrap().into_split();
|
||||||
let shutdown_notify = Arc::new(Notify::new());
|
let shutdown_notify = Arc::new(Notify::new());
|
||||||
@@ -92,6 +96,7 @@ where
|
|||||||
self.mix_sender.clone(),
|
self.mix_sender.clone(),
|
||||||
adapter_fn,
|
adapter_fn,
|
||||||
Arc::clone(&shutdown_notify),
|
Arc::clone(&shutdown_notify),
|
||||||
|
self.lane_queue_lengths.clone(),
|
||||||
self.shutdown_listener.clone(),
|
self.shutdown_listener.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ use tokio::select;
|
|||||||
use tokio::{net::tcp::OwnedWriteHalf, sync::Notify, time::sleep, time::Instant};
|
use tokio::{net::tcp::OwnedWriteHalf, sync::Notify, time::sleep, time::Instant};
|
||||||
|
|
||||||
const MIX_TTL: Duration = Duration::from_secs(5 * 60);
|
const MIX_TTL: Duration = Duration::from_secs(5 * 60);
|
||||||
|
//const MIX_TTL: Duration = Duration::from_secs(15 * 60);
|
||||||
|
|
||||||
async fn deal_with_message(
|
async fn deal_with_message(
|
||||||
connection_message: ConnectionMessage,
|
connection_message: ConnectionMessage,
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
// Copyright 2020 - Nym Technologies SA <contact@nymtech.net>
|
// Copyright 2020 - Nym Technologies SA <contact@nymtech.net>
|
||||||
// SPDX-License-Identifier: Apache-2.0
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
use client_connections::LaneQueueLengths;
|
||||||
use futures::channel::mpsc;
|
use futures::channel::mpsc;
|
||||||
use nymsphinx::addressing::clients::Recipient;
|
use nymsphinx::addressing::clients::Recipient;
|
||||||
use proxy_helpers::connection_controller::ConnectionReceiver;
|
use proxy_helpers::connection_controller::ConnectionReceiver;
|
||||||
@@ -41,6 +42,7 @@ impl Connection {
|
|||||||
&mut self,
|
&mut self,
|
||||||
mix_receiver: ConnectionReceiver,
|
mix_receiver: ConnectionReceiver,
|
||||||
mix_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
mix_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
shutdown: ShutdownListener,
|
shutdown: ShutdownListener,
|
||||||
) {
|
) {
|
||||||
let stream = self.conn.take().unwrap();
|
let stream = self.conn.take().unwrap();
|
||||||
@@ -54,6 +56,7 @@ impl Connection {
|
|||||||
mix_receiver,
|
mix_receiver,
|
||||||
mix_sender,
|
mix_sender,
|
||||||
connection_id,
|
connection_id,
|
||||||
|
lane_queue_lengths,
|
||||||
shutdown,
|
shutdown,
|
||||||
)
|
)
|
||||||
.run(move |conn_id, read_data, socket_closed| {
|
.run(move |conn_id, read_data, socket_closed| {
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ use crate::error::NetworkRequesterError;
|
|||||||
use crate::statistics::ServiceStatisticsCollector;
|
use crate::statistics::ServiceStatisticsCollector;
|
||||||
use crate::websocket;
|
use crate::websocket;
|
||||||
use crate::websocket::TSWebsocketStream;
|
use crate::websocket::TSWebsocketStream;
|
||||||
use client_connections::ClosedConnectionReceiver;
|
use client_connections::{ClosedConnectionReceiver, LaneQueueLengths, TransmissionLane};
|
||||||
use futures::channel::mpsc;
|
use futures::channel::mpsc;
|
||||||
use futures::stream::{SplitSink, SplitStream};
|
use futures::stream::{SplitSink, SplitStream};
|
||||||
use futures::{SinkExt, StreamExt};
|
use futures::{SinkExt, StreamExt};
|
||||||
@@ -118,6 +118,7 @@ impl ServiceProvider {
|
|||||||
|
|
||||||
async fn read_websocket_message(
|
async fn read_websocket_message(
|
||||||
websocket_reader: &mut SplitStream<TSWebsocketStream>,
|
websocket_reader: &mut SplitStream<TSWebsocketStream>,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
) -> Option<ReconstructedMessage> {
|
) -> Option<ReconstructedMessage> {
|
||||||
while let Some(msg) = websocket_reader.next().await {
|
while let Some(msg) = websocket_reader.next().await {
|
||||||
let data = msg
|
let data = msg
|
||||||
@@ -138,6 +139,15 @@ impl ServiceProvider {
|
|||||||
|
|
||||||
let received = match deserialized_message {
|
let received = match deserialized_message {
|
||||||
ServerResponse::Received(received) => received,
|
ServerResponse::Received(received) => received,
|
||||||
|
ServerResponse::LaneQueueLength(lane, queue_length) => {
|
||||||
|
log::info!(
|
||||||
|
"received LaneQueueLength lane: {lane}, queue_length: {queue_length}"
|
||||||
|
);
|
||||||
|
let lane = TransmissionLane::ConnectionId(lane);
|
||||||
|
let mut guard = lane_queue_lengths.lock().unwrap();
|
||||||
|
guard.map.insert(lane, queue_length);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
ServerResponse::Error(err) => {
|
ServerResponse::Error(err) => {
|
||||||
panic!("received error from native client! - {}", err)
|
panic!("received error from native client! - {}", err)
|
||||||
}
|
}
|
||||||
@@ -154,6 +164,7 @@ impl ServiceProvider {
|
|||||||
return_address: Recipient,
|
return_address: Recipient,
|
||||||
controller_sender: ControllerSender,
|
controller_sender: ControllerSender,
|
||||||
mix_input_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
mix_input_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
shutdown: ShutdownListener,
|
shutdown: ShutdownListener,
|
||||||
) {
|
) {
|
||||||
let mut conn = match Connection::new(conn_id, remote_addr.clone(), return_address).await {
|
let mut conn = match Connection::new(conn_id, remote_addr.clone(), return_address).await {
|
||||||
@@ -191,7 +202,7 @@ impl ServiceProvider {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// run the proxy on the connection
|
// run the proxy on the connection
|
||||||
conn.run_proxy(mix_receiver, mix_input_sender, shutdown)
|
conn.run_proxy(mix_receiver, mix_input_sender, lane_queue_lengths, shutdown)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
// proxy is done - remove the access channel from the controller
|
// proxy is done - remove the access channel from the controller
|
||||||
@@ -207,6 +218,7 @@ impl ServiceProvider {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
fn handle_proxy_connect(
|
fn handle_proxy_connect(
|
||||||
&mut self,
|
&mut self,
|
||||||
controller_sender: &mut ControllerSender,
|
controller_sender: &mut ControllerSender,
|
||||||
@@ -214,6 +226,7 @@ impl ServiceProvider {
|
|||||||
conn_id: ConnectionId,
|
conn_id: ConnectionId,
|
||||||
remote_addr: String,
|
remote_addr: String,
|
||||||
return_address: Recipient,
|
return_address: Recipient,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
shutdown: ShutdownListener,
|
shutdown: ShutdownListener,
|
||||||
) {
|
) {
|
||||||
if !self.open_proxy && !self.outbound_request_filter.check(&remote_addr) {
|
if !self.open_proxy && !self.outbound_request_filter.check(&remote_addr) {
|
||||||
@@ -241,6 +254,7 @@ impl ServiceProvider {
|
|||||||
return_address,
|
return_address,
|
||||||
controller_sender_clone,
|
controller_sender_clone,
|
||||||
mix_input_sender_clone,
|
mix_input_sender_clone,
|
||||||
|
lane_queue_lengths,
|
||||||
shutdown,
|
shutdown,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
@@ -265,6 +279,7 @@ impl ServiceProvider {
|
|||||||
controller_sender: &mut ControllerSender,
|
controller_sender: &mut ControllerSender,
|
||||||
mix_input_sender: &mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
mix_input_sender: &mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
||||||
stats_collector: Option<ServiceStatisticsCollector>,
|
stats_collector: Option<ServiceStatisticsCollector>,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
shutdown: ShutdownListener,
|
shutdown: ShutdownListener,
|
||||||
) {
|
) {
|
||||||
let deserialized_msg = match Socks5Message::try_from_bytes(raw_request) {
|
let deserialized_msg = match Socks5Message::try_from_bytes(raw_request) {
|
||||||
@@ -290,6 +305,7 @@ impl ServiceProvider {
|
|||||||
req.conn_id,
|
req.conn_id,
|
||||||
req.remote_addr,
|
req.remote_addr,
|
||||||
req.return_address,
|
req.return_address,
|
||||||
|
lane_queue_lengths,
|
||||||
shutdown,
|
shutdown,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
@@ -336,6 +352,10 @@ impl ServiceProvider {
|
|||||||
// `ClientRequest`.
|
// `ClientRequest`.
|
||||||
let (closed_connection_tx, closed_connection_rx) = mpsc::unbounded();
|
let (closed_connection_tx, closed_connection_rx) = mpsc::unbounded();
|
||||||
|
|
||||||
|
// Shared queue length data. Published by the `OutQueueController` in the client, and used
|
||||||
|
// primarily to throttle incoming connections
|
||||||
|
let shared_lane_queue_lengths = LaneQueueLengths::new();
|
||||||
|
|
||||||
// Controller for managing all active connections.
|
// Controller for managing all active connections.
|
||||||
// We provide it with a ShutdownListener since it requires it, even though for the network
|
// We provide it with a ShutdownListener since it requires it, even though for the network
|
||||||
// requester shutdown signalling is not yet fully implemented.
|
// requester shutdown signalling is not yet fully implemented.
|
||||||
@@ -376,7 +396,12 @@ impl ServiceProvider {
|
|||||||
println!("\nAll systems go. Press CTRL-C to stop the server.");
|
println!("\nAll systems go. Press CTRL-C to stop the server.");
|
||||||
// for each incoming message from the websocket... (which in 99.99% cases is going to be a mix message)
|
// for each incoming message from the websocket... (which in 99.99% cases is going to be a mix message)
|
||||||
loop {
|
loop {
|
||||||
let received = match Self::read_websocket_message(&mut websocket_reader).await {
|
let received = match Self::read_websocket_message(
|
||||||
|
&mut websocket_reader,
|
||||||
|
shared_lane_queue_lengths.clone(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
Some(msg) => msg,
|
Some(msg) => msg,
|
||||||
None => {
|
None => {
|
||||||
error!("The websocket stream has finished!");
|
error!("The websocket stream has finished!");
|
||||||
@@ -392,6 +417,7 @@ impl ServiceProvider {
|
|||||||
&mut controller_sender,
|
&mut controller_sender,
|
||||||
&mix_input_sender,
|
&mix_input_sender,
|
||||||
stats_collector.clone(),
|
stats_collector.clone(),
|
||||||
|
shared_lane_queue_lengths.clone(),
|
||||||
shutdown.subscribe(),
|
shutdown.subscribe(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|||||||
Reference in New Issue
Block a user