Compare commits

...

5 Commits

Author SHA1 Message Date
Jon Häggblad d21cbc9746 Update names 2022-11-19 11:06:57 +01:00
Jon Häggblad d4d739e267 WIP 2022-11-19 10:50:06 +01:00
Jon Häggblad 433214a9d8 Fix clippy 2022-11-19 10:41:23 +01:00
Jon Häggblad fb74d243d5 client-connections: rename to LaneQueueLenghts plural 2022-11-19 10:30:49 +01:00
Jon Häggblad ee54efe54b client-core: publish lane queue lengths 2022-11-19 10:20:08 +01:00
16 changed files with 358 additions and 36 deletions
Generated
+1
View File
@@ -581,6 +581,7 @@ name = "client-connections"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"futures", "futures",
"log",
] ]
[[package]] [[package]]
@@ -8,13 +8,15 @@
use self::{ use self::{
acknowledgement_control::AcknowledgementController, real_traffic_stream::OutQueueControl, acknowledgement_control::AcknowledgementController, real_traffic_stream::OutQueueControl,
}; };
use crate::client::real_messages_control::acknowledgement_control::AcknowledgementControllerConnectors; use crate::{
use crate::client::{ client::{
inbound_messages::InputMessageReceiver, mix_traffic::BatchMixMessageSender, inbound_messages::InputMessageReceiver, mix_traffic::BatchMixMessageSender,
topology_control::TopologyAccessor, real_messages_control::acknowledgement_control::AcknowledgementControllerConnectors,
topology_control::TopologyAccessor,
},
spawn_future,
}; };
use crate::spawn_future; use client_connections::{ClosedConnectionReceiver, LaneQueueLengths};
use client_connections::ClosedConnectionReceiver;
use futures::channel::mpsc; use futures::channel::mpsc;
use gateway_client::AcknowledgementReceiver; use gateway_client::AcknowledgementReceiver;
use log::*; use log::*;
@@ -104,6 +106,7 @@ where
// obviously when we finally make shared rng that is on 'higher' level, this should become // obviously when we finally make shared rng that is on 'higher' level, this should become
// generic `R` // generic `R`
impl RealMessagesController<OsRng> { impl RealMessagesController<OsRng> {
#[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
config: Config, config: Config,
ack_receiver: AcknowledgementReceiver, ack_receiver: AcknowledgementReceiver,
@@ -111,6 +114,7 @@ impl RealMessagesController<OsRng> {
mix_sender: BatchMixMessageSender, mix_sender: BatchMixMessageSender,
topology_access: TopologyAccessor, topology_access: TopologyAccessor,
#[cfg(feature = "reply-surb")] reply_key_storage: ReplyKeyStorage, #[cfg(feature = "reply-surb")] reply_key_storage: ReplyKeyStorage,
lane_queue_lengths: LaneQueueLengths,
closed_connection_rx: ClosedConnectionReceiver, closed_connection_rx: ClosedConnectionReceiver,
) -> Self { ) -> Self {
let rng = OsRng; let rng = OsRng;
@@ -161,6 +165,7 @@ impl RealMessagesController<OsRng> {
rng, rng,
config.self_recipient, config.self_recipient,
topology_access, topology_access,
lane_queue_lengths,
closed_connection_rx, closed_connection_rx,
); );
@@ -4,7 +4,9 @@
use crate::client::mix_traffic::BatchMixMessageSender; use crate::client::mix_traffic::BatchMixMessageSender;
use crate::client::real_messages_control::acknowledgement_control::SentPacketNotificationSender; use crate::client::real_messages_control::acknowledgement_control::SentPacketNotificationSender;
use crate::client::topology_control::TopologyAccessor; use crate::client::topology_control::TopologyAccessor;
use client_connections::{ClosedConnectionReceiver, ConnectionId, TransmissionLane}; use client_connections::{
ClosedConnectionReceiver, ConnectionId, LaneQueueLengths, TransmissionLane,
};
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::task::{Context, Poll}; use futures::task::{Context, Poll};
use futures::{Future, Stream, StreamExt}; use futures::{Future, Stream, StreamExt};
@@ -134,6 +136,9 @@ where
/// Incoming channel for being notified of closed connections, so that we can close lanes /// Incoming channel for being notified of closed connections, so that we can close lanes
/// corresponding to connections. To avoid sending traffic unnecessary /// corresponding to connections. To avoid sending traffic unnecessary
closed_connection_rx: ClosedConnectionReceiver, closed_connection_rx: ClosedConnectionReceiver,
/// Report queue lengths so that upstream can backoff sending data, and keep connections open.
lane_queue_lengths: LaneQueueLengths,
} }
pub(crate) struct RealMessage { pub(crate) struct RealMessage {
@@ -177,6 +182,7 @@ where
rng: R, rng: R,
our_full_destination: Recipient, our_full_destination: Recipient,
topology_access: TopologyAccessor, topology_access: TopologyAccessor,
lane_queue_lengths: LaneQueueLengths,
closed_connection_rx: ClosedConnectionReceiver, closed_connection_rx: ClosedConnectionReceiver,
) -> Self { ) -> Self {
OutQueueControl { OutQueueControl {
@@ -192,9 +198,14 @@ where
topology_access, topology_access,
transmission_buffer: Default::default(), transmission_buffer: Default::default(),
closed_connection_rx, closed_connection_rx,
lane_queue_lengths,
} }
} }
//pub fn get_lane_queue_length(&self) -> &LaneQueueLength {
// &self.lane_queue_length
//}
fn sent_notify(&self, frag_id: FragmentIdentifier) { fn sent_notify(&self, frag_id: FragmentIdentifier) {
// well technically the message was not sent just yet, but now it's up to internal // well technically the message was not sent just yet, but now it's up to internal
// queues and client load rather than the required delay. So realistically we can treat // queues and client load rather than the required delay. So realistically we can treat
@@ -272,9 +283,9 @@ where
} }
fn on_close_connection(&mut self, connection_id: ConnectionId) { fn on_close_connection(&mut self, connection_id: ConnectionId) {
log::debug!("Removing lane for connection: {connection_id}"); //log::debug!("Removing lane for connection: {connection_id}");
self.transmission_buffer //self.transmission_buffer
.remove(&TransmissionLane::ConnectionId(connection_id)); //.remove(&TransmissionLane::ConnectionId(connection_id));
} }
fn current_average_message_sending_delay(&self) -> Duration { fn current_average_message_sending_delay(&self) -> Duration {
@@ -309,6 +320,17 @@ where
} }
} }
fn pop_next_message(&mut self) -> Option<RealMessage> {
// Pop the next message from the transmission buffer
let (lane, real_next) = self.transmission_buffer.pop_next_message_at_random()?;
// Update the published queue length
let lane_length = self.transmission_buffer.lane_length(&lane);
self.lane_queue_lengths.set(&lane, lane_length);
Some(real_next)
}
fn poll_poisson(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> { fn poll_poisson(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
// The average delay could change depending on if backpressure in the downstream channel // The average delay could change depending on if backpressure in the downstream channel
// (mix_tx) was detected. // (mix_tx) was detected.
@@ -359,16 +381,13 @@ where
log::trace!("handling real_messages: size: {}", real_messages.len()); log::trace!("handling real_messages: size: {}", real_messages.len());
self.transmission_buffer.store(&conn_id, real_messages); self.transmission_buffer.store(&conn_id, real_messages);
let real_next = self let real_next = self.pop_next_message().expect("Just stored one");
.transmission_buffer
.pop_next_message_at_random()
.expect("we just added one");
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next)))) Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
} }
Poll::Pending => { Poll::Pending => {
if let Some(real_next) = self.transmission_buffer.pop_next_message_at_random() { if let Some(real_next) = self.pop_next_message() {
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next)))) Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
} else { } else {
// otherwise construct a dummy one // otherwise construct a dummy one
@@ -411,16 +430,13 @@ where
// First store what we got for the given connection id // First store what we got for the given connection id
self.transmission_buffer.store(&conn_id, real_messages); self.transmission_buffer.store(&conn_id, real_messages);
let real_next = self let real_next = self.pop_next_message().expect("we just added one");
.transmission_buffer
.pop_next_message_at_random()
.expect("we just added one");
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next)))) Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
} }
Poll::Pending => { Poll::Pending => {
if let Some(real_next) = self.transmission_buffer.pop_next_message_at_random() { if let Some(real_next) = self.pop_next_message() {
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next)))) Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
} else { } else {
Poll::Pending Poll::Pending
@@ -41,6 +41,10 @@ impl TransmissionBuffer {
self.buffer.keys().count() self.buffer.keys().count()
} }
pub(crate) fn lane_length(&self, lane: &TransmissionLane) -> Option<usize> {
self.buffer.get(lane).map(LaneBufferEntry::len)
}
#[allow(unused)] #[allow(unused)]
pub(crate) fn connections(&self) -> HashSet<u64> { pub(crate) fn connections(&self) -> HashSet<u64> {
self.buffer self.buffer
@@ -127,7 +131,7 @@ impl TransmissionBuffer {
Some(real_next) Some(real_next)
} }
pub(crate) fn pop_next_message_at_random(&mut self) -> Option<RealMessage> { pub(crate) fn pop_next_message_at_random(&mut self) -> Option<(TransmissionLane, RealMessage)> {
if self.buffer.is_empty() { if self.buffer.is_empty() {
return None; return None;
} }
@@ -142,8 +146,9 @@ impl TransmissionBuffer {
*self.pick_random_lane()? *self.pick_random_lane()?
}; };
let msg = self.pop_front_from_lane(&lane)?;
log::trace!("picking to send from lane: {:?}", lane); log::trace!("picking to send from lane: {:?}", lane);
self.pop_front_from_lane(&lane) Some((lane, msg))
} }
pub(crate) fn prune_stale_connections(&mut self) { pub(crate) fn prune_stale_connections(&mut self) {
+13 -1
View File
@@ -1,7 +1,9 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net> // Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use client_connections::{ClosedConnectionReceiver, ClosedConnectionSender, TransmissionLane}; use client_connections::{
ClosedConnectionReceiver, ClosedConnectionSender, LaneQueueLengths, TransmissionLane,
};
use client_core::client::cover_traffic_stream::LoopCoverTrafficStream; use client_core::client::cover_traffic_stream::LoopCoverTrafficStream;
use client_core::client::inbound_messages::{ use client_core::client::inbound_messages::{
InputMessage, InputMessageReceiver, InputMessageSender, InputMessage, InputMessageReceiver, InputMessageSender,
@@ -119,6 +121,7 @@ impl NymClient {
ack_receiver: AcknowledgementReceiver, ack_receiver: AcknowledgementReceiver,
input_receiver: InputMessageReceiver, input_receiver: InputMessageReceiver,
mix_sender: BatchMixMessageSender, mix_sender: BatchMixMessageSender,
lane_queue_lengths: LaneQueueLengths,
closed_connection_rx: ClosedConnectionReceiver, closed_connection_rx: ClosedConnectionReceiver,
shutdown: ShutdownListener, shutdown: ShutdownListener,
) { ) {
@@ -149,6 +152,7 @@ impl NymClient {
mix_sender, mix_sender,
topology_accessor, topology_accessor,
reply_key_storage, reply_key_storage,
lane_queue_lengths,
closed_connection_rx, closed_connection_rx,
) )
.start_with_shutdown(shutdown); .start_with_shutdown(shutdown);
@@ -283,6 +287,7 @@ impl NymClient {
&self, &self,
buffer_requester: ReceivedBufferRequestSender, buffer_requester: ReceivedBufferRequestSender,
msg_input: InputMessageSender, msg_input: InputMessageSender,
lane_queue_lengths: LaneQueueLengths,
closed_connection_tx: ClosedConnectionSender, closed_connection_tx: ClosedConnectionSender,
) { ) {
info!("Starting websocket listener..."); info!("Starting websocket listener...");
@@ -292,6 +297,7 @@ impl NymClient {
closed_connection_tx, closed_connection_tx,
buffer_requester, buffer_requester,
&self.as_mix_recipient(), &self.as_mix_recipient(),
lane_queue_lengths,
); );
websocket::Listener::new(self.config.get_listening_port()).start(websocket_handler); websocket::Listener::new(self.config.get_listening_port()).start(websocket_handler);
@@ -417,12 +423,17 @@ impl NymClient {
// controller that connections are closed. // controller that connections are closed.
let (closed_connection_tx, closed_connection_rx) = mpsc::unbounded(); let (closed_connection_tx, closed_connection_rx) = mpsc::unbounded();
// Shared queue length data. Published by the `OutQueueController` in the client, and used
// primarily to throttle incoming connections (e.g socks5 for attached network-requesters)
let shared_lane_queue_lengths = LaneQueueLengths::new();
self.start_real_traffic_controller( self.start_real_traffic_controller(
shared_topology_accessor.clone(), shared_topology_accessor.clone(),
reply_key_storage, reply_key_storage,
ack_receiver, ack_receiver,
input_receiver, input_receiver,
sphinx_message_sender.clone(), sphinx_message_sender.clone(),
shared_lane_queue_lengths.clone(),
closed_connection_rx, closed_connection_rx,
shutdown.subscribe(), shutdown.subscribe(),
); );
@@ -443,6 +454,7 @@ impl NymClient {
SocketType::WebSocket => self.start_websocket_listener( SocketType::WebSocket => self.start_websocket_listener(
received_buffer_request_sender, received_buffer_request_sender,
input_sender, input_sender,
shared_lane_queue_lengths,
closed_connection_tx, closed_connection_tx,
), ),
SocketType::None => { SocketType::None => {
+13 -2
View File
@@ -1,7 +1,7 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net> // Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use client_connections::{ClosedConnectionSender, TransmissionLane}; use client_connections::{ClosedConnectionSender, LaneQueueLengths, TransmissionLane};
use client_core::client::{ use client_core::client::{
inbound_messages::{InputMessage, InputMessageSender}, inbound_messages::{InputMessage, InputMessageSender},
received_buffer::{ received_buffer::{
@@ -40,6 +40,7 @@ pub(crate) struct Handler {
self_full_address: Recipient, self_full_address: Recipient,
socket: Option<WebSocketStream<TcpStream>>, socket: Option<WebSocketStream<TcpStream>>,
received_response_type: ReceivedResponseType, received_response_type: ReceivedResponseType,
lane_queue_lengths: LaneQueueLengths,
} }
// clone is used to use handler on a new connection, which initially is `None` // clone is used to use handler on a new connection, which initially is `None`
@@ -52,6 +53,7 @@ impl Clone for Handler {
self_full_address: self.self_full_address, self_full_address: self.self_full_address,
socket: None, socket: None,
received_response_type: Default::default(), received_response_type: Default::default(),
lane_queue_lengths: self.lane_queue_lengths.clone(),
} }
} }
} }
@@ -70,6 +72,7 @@ impl Handler {
closed_connection_tx: ClosedConnectionSender, closed_connection_tx: ClosedConnectionSender,
buffer_requester: ReceivedBufferRequestSender, buffer_requester: ReceivedBufferRequestSender,
self_full_address: &Recipient, self_full_address: &Recipient,
lane_queue_lengths: LaneQueueLengths,
) -> Self { ) -> Self {
Handler { Handler {
msg_input, msg_input,
@@ -78,6 +81,7 @@ impl Handler {
self_full_address: *self_full_address, self_full_address: *self_full_address,
socket: None, socket: None,
received_response_type: Default::default(), received_response_type: Default::default(),
lane_queue_lengths,
} }
} }
@@ -93,7 +97,14 @@ impl Handler {
let input_msg = InputMessage::new_fresh(*recipient, message, with_reply_surb, lane); let input_msg = InputMessage::new_fresh(*recipient, message, with_reply_surb, lane);
self.msg_input.unbounded_send(input_msg).unwrap(); self.msg_input.unbounded_send(input_msg).unwrap();
None // WIP(JON)
let queue_lengh = self
.lane_queue_lengths
.lock()
.unwrap()
.get(&lane)
.unwrap_or(0);
Some(ServerResponse::LaneQueueLength(connection_id, queue_lengh))
} }
fn handle_reply(&mut self, reply_surb: ReplySurb, message: Vec<u8>) -> Option<ServerResponse> { fn handle_reply(&mut self, reply_surb: ReplySurb, message: Vec<u8>) -> Option<ServerResponse> {
+11 -1
View File
@@ -9,7 +9,7 @@ use crate::socks::{
authentication::{AuthenticationMethods, Authenticator, User}, authentication::{AuthenticationMethods, Authenticator, User},
server::SphinxSocksServer, server::SphinxSocksServer,
}; };
use client_connections::{ClosedConnectionReceiver, ClosedConnectionSender}; use client_connections::{ClosedConnectionReceiver, ClosedConnectionSender, LaneQueueLengths};
use client_core::client::cover_traffic_stream::LoopCoverTrafficStream; use client_core::client::cover_traffic_stream::LoopCoverTrafficStream;
use client_core::client::inbound_messages::{ use client_core::client::inbound_messages::{
InputMessage, InputMessageReceiver, InputMessageSender, InputMessage, InputMessageReceiver, InputMessageSender,
@@ -120,6 +120,7 @@ impl NymClient {
input_receiver: InputMessageReceiver, input_receiver: InputMessageReceiver,
mix_sender: BatchMixMessageSender, mix_sender: BatchMixMessageSender,
closed_connection_rx: ClosedConnectionReceiver, closed_connection_rx: ClosedConnectionReceiver,
lane_queue_lengths: LaneQueueLengths,
shutdown: ShutdownListener, shutdown: ShutdownListener,
) { ) {
let mut controller_config = client_core::client::real_messages_control::Config::new( let mut controller_config = client_core::client::real_messages_control::Config::new(
@@ -149,6 +150,7 @@ impl NymClient {
mix_sender, mix_sender,
topology_accessor, topology_accessor,
reply_key_storage, reply_key_storage,
lane_queue_lengths,
closed_connection_rx, closed_connection_rx,
) )
.start_with_shutdown(shutdown); .start_with_shutdown(shutdown);
@@ -284,6 +286,7 @@ impl NymClient {
buffer_requester: ReceivedBufferRequestSender, buffer_requester: ReceivedBufferRequestSender,
msg_input: InputMessageSender, msg_input: InputMessageSender,
closed_connection_tx: ClosedConnectionSender, closed_connection_tx: ClosedConnectionSender,
lane_queue_lengths: LaneQueueLengths,
shutdown: ShutdownListener, shutdown: ShutdownListener,
) { ) {
info!("Starting socks5 listener..."); info!("Starting socks5 listener...");
@@ -296,6 +299,7 @@ impl NymClient {
authenticator, authenticator,
self.config.get_provider_mix_address(), self.config.get_provider_mix_address(),
self.as_mix_recipient(), self.as_mix_recipient(),
lane_queue_lengths,
shutdown, shutdown,
); );
tokio::spawn(async move { tokio::spawn(async move {
@@ -409,6 +413,10 @@ impl NymClient {
// This will be forwarded to `OutQueueControl` // This will be forwarded to `OutQueueControl`
let (closed_connection_tx, closed_connection_rx) = mpsc::unbounded(); let (closed_connection_tx, closed_connection_rx) = mpsc::unbounded();
// Shared queue length data. Published by the `OutQueueController` in the client, and used
// primarily to throttle incoming connections
let shared_lane_queue_lengths = LaneQueueLengths::new();
self.start_real_traffic_controller( self.start_real_traffic_controller(
shared_topology_accessor.clone(), shared_topology_accessor.clone(),
reply_key_storage, reply_key_storage,
@@ -416,6 +424,7 @@ impl NymClient {
input_receiver, input_receiver,
sphinx_message_sender.clone(), sphinx_message_sender.clone(),
closed_connection_rx, closed_connection_rx,
shared_lane_queue_lengths.clone(),
shutdown.subscribe(), shutdown.subscribe(),
); );
@@ -435,6 +444,7 @@ impl NymClient {
received_buffer_request_sender, received_buffer_request_sender,
input_sender, input_sender,
closed_connection_tx, closed_connection_tx,
shared_lane_queue_lengths,
shutdown.subscribe(), shutdown.subscribe(),
); );
+49 -1
View File
@@ -4,7 +4,7 @@ use super::authentication::{AuthenticationMethods, Authenticator, User};
use super::request::{SocksCommand, SocksRequest}; use super::request::{SocksCommand, SocksRequest};
use super::types::{ResponseCode, SocksProxyError}; use super::types::{ResponseCode, SocksProxyError};
use super::{RESERVED, SOCKS_VERSION}; use super::{RESERVED, SOCKS_VERSION};
use client_connections::TransmissionLane; use client_connections::{LaneQueueLengths, TransmissionLane};
use client_core::client::inbound_messages::{InputMessage, InputMessageSender}; use client_core::client::inbound_messages::{InputMessage, InputMessageSender};
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::task::{Context, Poll}; use futures::task::{Context, Poll};
@@ -20,6 +20,7 @@ use socks5_requests::{ConnectionId, Message, RemoteAddress, Request};
use std::io; use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc;
use task::ShutdownListener; use task::ShutdownListener;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio::{self, net::TcpStream}; use tokio::{self, net::TcpStream};
@@ -141,7 +142,9 @@ pub(crate) struct SocksClient {
service_provider: Recipient, service_provider: Recipient,
self_address: Recipient, self_address: Recipient,
started_proxy: bool, started_proxy: bool,
lane_queue_lengths: LaneQueueLengths,
shutdown_listener: ShutdownListener, shutdown_listener: ShutdownListener,
active_connections: Arc<std::sync::Mutex<u64>>,
} }
impl Drop for SocksClient { impl Drop for SocksClient {
@@ -165,9 +168,17 @@ impl SocksClient {
service_provider: Recipient, service_provider: Recipient,
controller_sender: ControllerSender, controller_sender: ControllerSender,
self_address: Recipient, self_address: Recipient,
lane_queue_lengths: LaneQueueLengths,
shutdown_listener: ShutdownListener, shutdown_listener: ShutdownListener,
active_connections: Arc<std::sync::Mutex<u64>>,
) -> Self { ) -> Self {
let connection_id = Self::generate_random(); let connection_id = Self::generate_random();
let u_active_connections = {
let g = active_connections.lock().unwrap();
*g
};
error!("client active_connections: {}", u_active_connections);
SocksClient { SocksClient {
controller_sender, controller_sender,
connection_id, connection_id,
@@ -179,7 +190,9 @@ impl SocksClient {
service_provider, service_provider,
self_address, self_address,
started_proxy: false, started_proxy: false,
lane_queue_lengths,
shutdown_listener, shutdown_listener,
active_connections,
} }
} }
@@ -205,6 +218,8 @@ impl SocksClient {
/// is in use and that the client is authenticated, then runs the request. /// is in use and that the client is authenticated, then runs the request.
pub async fn run(&mut self) -> Result<(), SocksProxyError> { pub async fn run(&mut self) -> Result<(), SocksProxyError> {
debug!("New connection from: {}", self.stream.peer_addr()?.ip()); debug!("New connection from: {}", self.stream.peer_addr()?.ip());
//dbg!(&self.stream.peer_addr());
let mut header = [0u8; 2]; let mut header = [0u8; 2];
// Read a byte from the stream and determine the version being requested // Read a byte from the stream and determine the version being requested
self.stream.read_exact(&mut header).await?; self.stream.read_exact(&mut header).await?;
@@ -258,6 +273,7 @@ impl SocksClient {
conn_receiver, conn_receiver,
input_sender, input_sender,
connection_id, connection_id,
self.lane_queue_lengths.clone(),
self.shutdown_listener.clone(), self.shutdown_listener.clone(),
) )
.run(move |conn_id, read_data, socket_closed| { .run(move |conn_id, read_data, socket_closed| {
@@ -275,10 +291,24 @@ impl SocksClient {
/// Handles a client request. /// Handles a client request.
async fn handle_request(&mut self) -> Result<(), SocksProxyError> { async fn handle_request(&mut self) -> Result<(), SocksProxyError> {
debug!("Handling CONNECT Command"); debug!("Handling CONNECT Command");
let active_connections = {
let g = self.active_connections.lock().unwrap();
*g
};
error!("active_connections: {}", active_connections);
let request = SocksRequest::from_stream(&mut self.stream).await?; let request = SocksRequest::from_stream(&mut self.stream).await?;
let remote_address = request.to_string(); let remote_address = request.to_string();
if active_connections > 50 {
log::warn!(
"Refusing SOCKS5: too many connections: {}",
active_connections
);
self.refuse_connection_socks5().await;
return Ok(());
}
// setup for receiving from the mixnet // setup for receiving from the mixnet
let (mix_sender, mix_receiver) = mpsc::unbounded(); let (mix_sender, mix_receiver) = mpsc::unbounded();
@@ -332,6 +362,24 @@ impl SocksClient {
.unwrap(); .unwrap();
} }
async fn refuse_connection_socks5(&mut self) {
self.stream
.write_all(&[
SOCKS_VERSION,
ResponseCode::RuleFailure as u8,
RESERVED,
1,
127,
0,
0,
1,
0,
0,
])
.await
.unwrap();
}
/// Authenticate the incoming request. Each request is checked for its /// Authenticate the incoming request. Each request is checked for its
/// authentication method. A user/password request will extract the /// authentication method. A user/password request will extract the
/// username and password from the stream, then check with the Authenticator /// username and password from the stream, then check with the Authenticator
+18 -1
View File
@@ -4,7 +4,7 @@ use super::{
mixnet_responses::MixnetResponseListener, mixnet_responses::MixnetResponseListener,
types::{ResponseCode, SocksProxyError}, types::{ResponseCode, SocksProxyError},
}; };
use client_connections::ClosedConnectionSender; use client_connections::{ClosedConnectionSender, LaneQueueLengths};
use client_core::client::{ use client_core::client::{
inbound_messages::InputMessageSender, received_buffer::ReceivedBufferRequestSender, inbound_messages::InputMessageSender, received_buffer::ReceivedBufferRequestSender,
}; };
@@ -12,6 +12,7 @@ use log::*;
use nymsphinx::addressing::clients::Recipient; use nymsphinx::addressing::clients::Recipient;
use proxy_helpers::connection_controller::Controller; use proxy_helpers::connection_controller::Controller;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc;
use task::ShutdownListener; use task::ShutdownListener;
use tokio::net::TcpListener; use tokio::net::TcpListener;
@@ -21,6 +22,7 @@ pub struct SphinxSocksServer {
listening_address: SocketAddr, listening_address: SocketAddr,
service_provider: Recipient, service_provider: Recipient,
self_address: Recipient, self_address: Recipient,
lane_queue_lengths: LaneQueueLengths,
shutdown: ShutdownListener, shutdown: ShutdownListener,
} }
@@ -31,6 +33,7 @@ impl SphinxSocksServer {
authenticator: Authenticator, authenticator: Authenticator,
service_provider: Recipient, service_provider: Recipient,
self_address: Recipient, self_address: Recipient,
lane_queue_lengths: LaneQueueLengths,
shutdown: ShutdownListener, shutdown: ShutdownListener,
) -> Self { ) -> Self {
// hardcode ip as we (presumably) ONLY want to listen locally. If we change it, we can // hardcode ip as we (presumably) ONLY want to listen locally. If we change it, we can
@@ -42,6 +45,7 @@ impl SphinxSocksServer {
listening_address: format!("{}:{}", ip, port).parse().unwrap(), listening_address: format!("{}:{}", ip, port).parse().unwrap(),
service_provider, service_provider,
self_address, self_address,
lane_queue_lengths,
shutdown, shutdown,
} }
} }
@@ -74,10 +78,16 @@ impl SphinxSocksServer {
mixnet_response_listener.run().await; mixnet_response_listener.run().await;
}); });
let active_connections = Arc::new(std::sync::Mutex::new(0));
loop { loop {
tokio::select! { tokio::select! {
Ok((stream, _remote)) = listener.accept() => { Ok((stream, _remote)) = listener.accept() => {
// TODO Optimize this // TODO Optimize this
{
let mut g = active_connections.lock().unwrap();
*g += 1;
}
let mut client = SocksClient::new( let mut client = SocksClient::new(
stream, stream,
self.authenticator.clone(), self.authenticator.clone(),
@@ -85,9 +95,12 @@ impl SphinxSocksServer {
self.service_provider, self.service_provider,
controller_sender.clone(), controller_sender.clone(),
self.self_address, self.self_address,
self.lane_queue_lengths.clone(),
self.shutdown.clone(), self.shutdown.clone(),
active_connections.clone(),
); );
let active_connections = active_connections.clone();
tokio::spawn(async move { tokio::spawn(async move {
{ {
match client.run().await { match client.run().await {
@@ -118,6 +131,10 @@ impl SphinxSocksServer {
}; };
// client gets dropped here // client gets dropped here
} }
{
let mut g = active_connections.lock().unwrap();
*g -= 1;
}
}); });
}, },
_ = self.shutdown.recv() => { _ = self.shutdown.recv() => {
+1
View File
@@ -7,3 +7,4 @@ edition = "2021"
[dependencies] [dependencies]
futures = "0.3" futures = "0.3"
log = "0.4.17"
+80
View File
@@ -1,6 +1,8 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net> // Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use std::collections::HashMap;
use futures::channel::mpsc; use futures::channel::mpsc;
pub type ConnectionId = u64; pub type ConnectionId = u64;
@@ -19,3 +21,81 @@ pub enum TransmissionLane {
/// they can forward this to the `OutQueueControl` (via `ClientRequest` for the network-requester) /// they can forward this to the `OutQueueControl` (via `ClientRequest` for the network-requester)
pub type ClosedConnectionSender = mpsc::UnboundedSender<ConnectionId>; pub type ClosedConnectionSender = mpsc::UnboundedSender<ConnectionId>;
pub type ClosedConnectionReceiver = mpsc::UnboundedReceiver<ConnectionId>; pub type ClosedConnectionReceiver = mpsc::UnboundedReceiver<ConnectionId>;
// The `OutQueueControl` publishes the backlog per lane, primarily so that upstream can slow down
// if needed.
#[derive(Clone, Debug)]
pub struct LaneQueueLengths(std::sync::Arc<std::sync::Mutex<LaneQueueLengthsInner>>);
impl LaneQueueLengths {
pub fn new() -> Self {
LaneQueueLengths(std::sync::Arc::new(std::sync::Mutex::new(
LaneQueueLengthsInner {
map: HashMap::new(),
},
)))
}
pub fn set(&mut self, lane: &TransmissionLane, lane_length: Option<usize>) {
match self.0.lock() {
Ok(mut inner) => {
if let Some(length) = lane_length {
inner
.map
.entry(*lane)
.and_modify(|e| *e = length)
.or_insert(length);
} else {
inner.map.remove(lane);
}
}
Err(err) => log::warn!("Failed to set lane queue length: {err}"),
}
}
pub fn get(&self, lane: &TransmissionLane) -> Option<usize> {
match self.0.lock() {
Ok(inner) => inner.get(lane),
Err(err) => {
log::warn!("Failed to get lane queue length: {err}");
None
}
}
}
}
impl Default for LaneQueueLengths {
fn default() -> Self {
Self::new()
}
}
impl std::ops::Deref for LaneQueueLengths {
type Target = std::sync::Arc<std::sync::Mutex<LaneQueueLengthsInner>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Debug)]
pub struct LaneQueueLengthsInner {
pub map: HashMap<TransmissionLane, usize>,
}
impl LaneQueueLengthsInner {
pub fn get(&self, lane: &TransmissionLane) -> Option<usize> {
self.map.get(lane).copied()
}
pub fn values(&self) -> impl Iterator<Item = &usize> {
self.map.values()
}
pub fn modify<F>(&mut self, lane: &TransmissionLane, f: F)
where
F: FnOnce(&mut usize),
{
self.map.entry(*lane).and_modify(f);
}
}
@@ -5,11 +5,14 @@ use super::MixProxySender;
use super::SHUTDOWN_TIMEOUT; use super::SHUTDOWN_TIMEOUT;
use crate::available_reader::AvailableReader; use crate::available_reader::AvailableReader;
use bytes::Bytes; use bytes::Bytes;
use client_connections::LaneQueueLengths;
use client_connections::TransmissionLane;
use futures::FutureExt; use futures::FutureExt;
use futures::StreamExt; use futures::StreamExt;
use log::*; use log::*;
use ordered_buffer::OrderedMessageSender; use ordered_buffer::OrderedMessageSender;
use socks5_requests::ConnectionId; use socks5_requests::ConnectionId;
use std::time::Duration;
use std::{io, sync::Arc}; use std::{io, sync::Arc};
use task::ShutdownListener; use task::ShutdownListener;
use tokio::select; use tokio::select;
@@ -29,7 +32,7 @@ fn send_empty_close<F, S>(
.unwrap(); .unwrap();
} }
fn deal_with_data<F, S>( async fn deal_with_data<F, S>(
read_data: Option<io::Result<Bytes>>, read_data: Option<io::Result<Bytes>>,
local_destination_address: &str, local_destination_address: &str,
remote_source_address: &str, remote_source_address: &str,
@@ -37,6 +40,7 @@ fn deal_with_data<F, S>(
message_sender: &mut OrderedMessageSender, message_sender: &mut OrderedMessageSender,
mix_sender: &MixProxySender<S>, mix_sender: &MixProxySender<S>,
adapter_fn: F, adapter_fn: F,
lane_queue_lengths: LaneQueueLengths,
) -> bool ) -> bool
where where
F: Fn(ConnectionId, Vec<u8>, bool) -> S, F: Fn(ConnectionId, Vec<u8>, bool) -> S,
@@ -67,6 +71,70 @@ where
"pushing data down the input sender: size: {}", "pushing data down the input sender: size: {}",
ordered_msg.len() ordered_msg.len()
); );
// wait here until queue is not too long
let lane = TransmissionLane::ConnectionId(connection_id);
//loop {
{
let (queue_length, est_busy_conn) = {
let mut guard = lane_queue_lengths.lock().unwrap();
//let queue_length = *guard.get(&lane).unwrap_or(&0);
//let queue_length = *guard.entry(lane).or_insert(0);
let queue_length = guard.get(&lane).unwrap_or(0);
let est_busy_conn = guard.values().filter(|v| *v > &5).count();
// We estimate the queue length for subsequent data packet. This is needed
// because there is a delay until we get the correct value back as a server
// response. (And that will have a delay baked in).
// WIP(JON): pull packet size from somewhere
let sphinx_size = 2000.0;
let msg_length = (ordered_msg.len() as f64 / sphinx_size).ceil() as usize;
guard.modify(&lane, |length| *length += msg_length);
(queue_length, est_busy_conn)
};
log::info!("conn_id: {connection_id}, queue: {queue_length}");
// The heuristic here is:
// 20ms average delay => 50 packets / sec
// 500 packet queue => 10 sec behind
// This assumes it's the only active connection, and that there is no throttling
// In practive, this is a latency vs throughput tradeoff we're making here
let avererage_delay = 0.02; // TODO: read from config
let packets_per_sec = 1.0 / avererage_delay;
let ideal_time_to_clear_queue = queue_length as f64 / packets_per_sec;
if queue_length > 5000 {
log::info!("sleeping long");
sleep(Duration::from_secs_f64(ideal_time_to_clear_queue * 5.0)).await;
} else if queue_length > 500 {
log::info!("sleeping medium");
sleep(Duration::from_secs_f64(
ideal_time_to_clear_queue * 2.0 / 3.0,
))
.await;
} else if queue_length > 5 {
log::info!("sleeping short");
sleep(Duration::from_secs_f64(ideal_time_to_clear_queue / 3.0)).await;
}
// If we are saturated on number of connections, and this is already a busy
// connection, basically soft-stop
//if est_busy_conn > 15 && queue_length > 5 {
// log::info!("soft-stop: {connection_id}");
// sleep(Duration::from_secs_f64(5.0 * 60.0)).await;
//}
//loop {
// let count = ACTIVE_PROXIES.load(Ordering::Relaxed);
// if count + 1 > 15 {
// log::info!("Max connections reached, parking: {conn_id}");
// sleep(Duration::from_secs(10)).await;
// } else {
// break;
// }
//}
}
mix_sender mix_sender
.unbounded_send(adapter_fn(connection_id, ordered_msg, is_finished)) .unbounded_send(adapter_fn(connection_id, ordered_msg, is_finished))
.unwrap(); .unwrap();
@@ -88,6 +156,7 @@ pub(super) async fn run_inbound<F, S>(
mix_sender: MixProxySender<S>, mix_sender: MixProxySender<S>,
adapter_fn: F, adapter_fn: F,
shutdown_notify: Arc<Notify>, shutdown_notify: Arc<Notify>,
lane_queue_lengths: LaneQueueLengths,
mut shutdown_listener: ShutdownListener, mut shutdown_listener: ShutdownListener,
) -> OwnedReadHalf ) -> OwnedReadHalf
where where
@@ -102,12 +171,24 @@ where
loop { loop {
select! { select! {
read_data = &mut available_reader.next() => { read_data = &mut available_reader.next() => {
if deal_with_data(read_data, &local_destination_address, &remote_source_address, connection_id, &mut message_sender, &mix_sender, &adapter_fn) { if deal_with_data(
read_data,
&local_destination_address,
&remote_source_address,
connection_id,
&mut message_sender,
&mix_sender,
&adapter_fn,
lane_queue_lengths.clone()
).await {
break break
} }
} }
_ = &mut shutdown_future => { _ = &mut shutdown_future => {
debug!("closing inbound proxy after outbound was closed {:?} ago", SHUTDOWN_TIMEOUT); debug!(
"closing inbound proxy after outbound was closed {:?} ago",
SHUTDOWN_TIMEOUT
);
// inform remote just in case it was closed because of lack of heartbeat. // inform remote just in case it was closed because of lack of heartbeat.
// worst case the remote will just have couple of false negatives // worst case the remote will just have couple of false negatives
send_empty_close(connection_id, &mut message_sender, &mix_sender, &adapter_fn); send_empty_close(connection_id, &mut message_sender, &mix_sender, &adapter_fn);
@@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use crate::connection_controller::ConnectionReceiver; use crate::connection_controller::ConnectionReceiver;
use client_connections::LaneQueueLengths;
use futures::channel::mpsc; use futures::channel::mpsc;
use socks5_requests::ConnectionId; use socks5_requests::ConnectionId;
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
@@ -12,7 +13,7 @@ mod inbound;
mod outbound; mod outbound;
// TODO: make this configurable // TODO: make this configurable
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30); const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(300);
#[derive(Debug)] #[derive(Debug)]
pub struct ProxyMessage { pub struct ProxyMessage {
@@ -45,6 +46,7 @@ pub struct ProxyRunner<S> {
local_destination_address: String, local_destination_address: String,
remote_source_address: String, remote_source_address: String,
connection_id: ConnectionId, connection_id: ConnectionId,
lane_queue_lengths: LaneQueueLengths,
// Listens to shutdown commands from higher up // Listens to shutdown commands from higher up
shutdown_listener: ShutdownListener, shutdown_listener: ShutdownListener,
@@ -61,6 +63,7 @@ where
mix_receiver: ConnectionReceiver, mix_receiver: ConnectionReceiver,
mix_sender: MixProxySender<S>, mix_sender: MixProxySender<S>,
connection_id: ConnectionId, connection_id: ConnectionId,
lane_queue_lengths: LaneQueueLengths,
shutdown_listener: ShutdownListener, shutdown_listener: ShutdownListener,
) -> Self { ) -> Self {
ProxyRunner { ProxyRunner {
@@ -70,6 +73,7 @@ where
local_destination_address, local_destination_address,
remote_source_address, remote_source_address,
connection_id, connection_id,
lane_queue_lengths,
shutdown_listener, shutdown_listener,
} }
} }
@@ -78,7 +82,7 @@ where
// request/response as required by entity running particular side of the proxy. // request/response as required by entity running particular side of the proxy.
pub async fn run<F>(mut self, adapter_fn: F) -> Self pub async fn run<F>(mut self, adapter_fn: F) -> Self
where where
F: Fn(ConnectionId, Vec<u8>, bool) -> S + Send + 'static, F: Fn(ConnectionId, Vec<u8>, bool) -> S + Send + Sync + 'static,
{ {
let (read_half, write_half) = self.socket.take().unwrap().into_split(); let (read_half, write_half) = self.socket.take().unwrap().into_split();
let shutdown_notify = Arc::new(Notify::new()); let shutdown_notify = Arc::new(Notify::new());
@@ -92,6 +96,7 @@ where
self.mix_sender.clone(), self.mix_sender.clone(),
adapter_fn, adapter_fn,
Arc::clone(&shutdown_notify), Arc::clone(&shutdown_notify),
self.lane_queue_lengths.clone(),
self.shutdown_listener.clone(), self.shutdown_listener.clone(),
); );
@@ -14,6 +14,7 @@ use tokio::select;
use tokio::{net::tcp::OwnedWriteHalf, sync::Notify, time::sleep, time::Instant}; use tokio::{net::tcp::OwnedWriteHalf, sync::Notify, time::sleep, time::Instant};
const MIX_TTL: Duration = Duration::from_secs(5 * 60); const MIX_TTL: Duration = Duration::from_secs(5 * 60);
//const MIX_TTL: Duration = Duration::from_secs(15 * 60);
async fn deal_with_message( async fn deal_with_message(
connection_message: ConnectionMessage, connection_message: ConnectionMessage,
@@ -1,6 +1,7 @@
// Copyright 2020 - Nym Technologies SA <contact@nymtech.net> // Copyright 2020 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use client_connections::LaneQueueLengths;
use futures::channel::mpsc; use futures::channel::mpsc;
use nymsphinx::addressing::clients::Recipient; use nymsphinx::addressing::clients::Recipient;
use proxy_helpers::connection_controller::ConnectionReceiver; use proxy_helpers::connection_controller::ConnectionReceiver;
@@ -41,6 +42,7 @@ impl Connection {
&mut self, &mut self,
mix_receiver: ConnectionReceiver, mix_receiver: ConnectionReceiver,
mix_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>, mix_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
lane_queue_lengths: LaneQueueLengths,
shutdown: ShutdownListener, shutdown: ShutdownListener,
) { ) {
let stream = self.conn.take().unwrap(); let stream = self.conn.take().unwrap();
@@ -54,6 +56,7 @@ impl Connection {
mix_receiver, mix_receiver,
mix_sender, mix_sender,
connection_id, connection_id,
lane_queue_lengths,
shutdown, shutdown,
) )
.run(move |conn_id, read_data, socket_closed| { .run(move |conn_id, read_data, socket_closed| {
@@ -7,7 +7,7 @@ use crate::error::NetworkRequesterError;
use crate::statistics::ServiceStatisticsCollector; use crate::statistics::ServiceStatisticsCollector;
use crate::websocket; use crate::websocket;
use crate::websocket::TSWebsocketStream; use crate::websocket::TSWebsocketStream;
use client_connections::ClosedConnectionReceiver; use client_connections::{ClosedConnectionReceiver, LaneQueueLengths, TransmissionLane};
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::stream::{SplitSink, SplitStream}; use futures::stream::{SplitSink, SplitStream};
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
@@ -118,6 +118,7 @@ impl ServiceProvider {
async fn read_websocket_message( async fn read_websocket_message(
websocket_reader: &mut SplitStream<TSWebsocketStream>, websocket_reader: &mut SplitStream<TSWebsocketStream>,
lane_queue_lengths: LaneQueueLengths,
) -> Option<ReconstructedMessage> { ) -> Option<ReconstructedMessage> {
while let Some(msg) = websocket_reader.next().await { while let Some(msg) = websocket_reader.next().await {
let data = msg let data = msg
@@ -138,6 +139,15 @@ impl ServiceProvider {
let received = match deserialized_message { let received = match deserialized_message {
ServerResponse::Received(received) => received, ServerResponse::Received(received) => received,
ServerResponse::LaneQueueLength(lane, queue_length) => {
log::info!(
"received LaneQueueLength lane: {lane}, queue_length: {queue_length}"
);
let lane = TransmissionLane::ConnectionId(lane);
let mut guard = lane_queue_lengths.lock().unwrap();
guard.map.insert(lane, queue_length);
continue;
}
ServerResponse::Error(err) => { ServerResponse::Error(err) => {
panic!("received error from native client! - {}", err) panic!("received error from native client! - {}", err)
} }
@@ -154,6 +164,7 @@ impl ServiceProvider {
return_address: Recipient, return_address: Recipient,
controller_sender: ControllerSender, controller_sender: ControllerSender,
mix_input_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>, mix_input_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
lane_queue_lengths: LaneQueueLengths,
shutdown: ShutdownListener, shutdown: ShutdownListener,
) { ) {
let mut conn = match Connection::new(conn_id, remote_addr.clone(), return_address).await { let mut conn = match Connection::new(conn_id, remote_addr.clone(), return_address).await {
@@ -191,7 +202,7 @@ impl ServiceProvider {
); );
// run the proxy on the connection // run the proxy on the connection
conn.run_proxy(mix_receiver, mix_input_sender, shutdown) conn.run_proxy(mix_receiver, mix_input_sender, lane_queue_lengths, shutdown)
.await; .await;
// proxy is done - remove the access channel from the controller // proxy is done - remove the access channel from the controller
@@ -207,6 +218,7 @@ impl ServiceProvider {
); );
} }
#[allow(clippy::too_many_arguments)]
fn handle_proxy_connect( fn handle_proxy_connect(
&mut self, &mut self,
controller_sender: &mut ControllerSender, controller_sender: &mut ControllerSender,
@@ -214,6 +226,7 @@ impl ServiceProvider {
conn_id: ConnectionId, conn_id: ConnectionId,
remote_addr: String, remote_addr: String,
return_address: Recipient, return_address: Recipient,
lane_queue_lengths: LaneQueueLengths,
shutdown: ShutdownListener, shutdown: ShutdownListener,
) { ) {
if !self.open_proxy && !self.outbound_request_filter.check(&remote_addr) { if !self.open_proxy && !self.outbound_request_filter.check(&remote_addr) {
@@ -241,6 +254,7 @@ impl ServiceProvider {
return_address, return_address,
controller_sender_clone, controller_sender_clone,
mix_input_sender_clone, mix_input_sender_clone,
lane_queue_lengths,
shutdown, shutdown,
) )
.await .await
@@ -265,6 +279,7 @@ impl ServiceProvider {
controller_sender: &mut ControllerSender, controller_sender: &mut ControllerSender,
mix_input_sender: &mpsc::UnboundedSender<(Socks5Message, Recipient)>, mix_input_sender: &mpsc::UnboundedSender<(Socks5Message, Recipient)>,
stats_collector: Option<ServiceStatisticsCollector>, stats_collector: Option<ServiceStatisticsCollector>,
lane_queue_lengths: LaneQueueLengths,
shutdown: ShutdownListener, shutdown: ShutdownListener,
) { ) {
let deserialized_msg = match Socks5Message::try_from_bytes(raw_request) { let deserialized_msg = match Socks5Message::try_from_bytes(raw_request) {
@@ -290,6 +305,7 @@ impl ServiceProvider {
req.conn_id, req.conn_id,
req.remote_addr, req.remote_addr,
req.return_address, req.return_address,
lane_queue_lengths,
shutdown, shutdown,
) )
} }
@@ -336,6 +352,10 @@ impl ServiceProvider {
// `ClientRequest`. // `ClientRequest`.
let (closed_connection_tx, closed_connection_rx) = mpsc::unbounded(); let (closed_connection_tx, closed_connection_rx) = mpsc::unbounded();
// Shared queue length data. Published by the `OutQueueController` in the client, and used
// primarily to throttle incoming connections
let shared_lane_queue_lengths = LaneQueueLengths::new();
// Controller for managing all active connections. // Controller for managing all active connections.
// We provide it with a ShutdownListener since it requires it, even though for the network // We provide it with a ShutdownListener since it requires it, even though for the network
// requester shutdown signalling is not yet fully implemented. // requester shutdown signalling is not yet fully implemented.
@@ -376,7 +396,12 @@ impl ServiceProvider {
println!("\nAll systems go. Press CTRL-C to stop the server."); println!("\nAll systems go. Press CTRL-C to stop the server.");
// for each incoming message from the websocket... (which in 99.99% cases is going to be a mix message) // for each incoming message from the websocket... (which in 99.99% cases is going to be a mix message)
loop { loop {
let received = match Self::read_websocket_message(&mut websocket_reader).await { let received = match Self::read_websocket_message(
&mut websocket_reader,
shared_lane_queue_lengths.clone(),
)
.await
{
Some(msg) => msg, Some(msg) => msg,
None => { None => {
error!("The websocket stream has finished!"); error!("The websocket stream has finished!");
@@ -392,6 +417,7 @@ impl ServiceProvider {
&mut controller_sender, &mut controller_sender,
&mix_input_sender, &mix_input_sender,
stats_collector.clone(), stats_collector.clone(),
shared_lane_queue_lengths.clone(),
shutdown.subscribe(), shutdown.subscribe(),
) )
.await; .await;