Compare commits

...

5 Commits

Author SHA1 Message Date
Jon Häggblad d21cbc9746 Update names 2022-11-19 11:06:57 +01:00
Jon Häggblad d4d739e267 WIP 2022-11-19 10:50:06 +01:00
Jon Häggblad 433214a9d8 Fix clippy 2022-11-19 10:41:23 +01:00
Jon Häggblad fb74d243d5 client-connections: rename to LaneQueueLenghts plural 2022-11-19 10:30:49 +01:00
Jon Häggblad ee54efe54b client-core: publish lane queue lengths 2022-11-19 10:20:08 +01:00
16 changed files with 358 additions and 36 deletions
Generated
+1
View File
@@ -581,6 +581,7 @@ name = "client-connections"
version = "0.1.0"
dependencies = [
"futures",
"log",
]
[[package]]
@@ -8,13 +8,15 @@
use self::{
acknowledgement_control::AcknowledgementController, real_traffic_stream::OutQueueControl,
};
use crate::client::real_messages_control::acknowledgement_control::AcknowledgementControllerConnectors;
use crate::client::{
inbound_messages::InputMessageReceiver, mix_traffic::BatchMixMessageSender,
topology_control::TopologyAccessor,
use crate::{
client::{
inbound_messages::InputMessageReceiver, mix_traffic::BatchMixMessageSender,
real_messages_control::acknowledgement_control::AcknowledgementControllerConnectors,
topology_control::TopologyAccessor,
},
spawn_future,
};
use crate::spawn_future;
use client_connections::ClosedConnectionReceiver;
use client_connections::{ClosedConnectionReceiver, LaneQueueLengths};
use futures::channel::mpsc;
use gateway_client::AcknowledgementReceiver;
use log::*;
@@ -104,6 +106,7 @@ where
// obviously when we finally make shared rng that is on 'higher' level, this should become
// generic `R`
impl RealMessagesController<OsRng> {
#[allow(clippy::too_many_arguments)]
pub fn new(
config: Config,
ack_receiver: AcknowledgementReceiver,
@@ -111,6 +114,7 @@ impl RealMessagesController<OsRng> {
mix_sender: BatchMixMessageSender,
topology_access: TopologyAccessor,
#[cfg(feature = "reply-surb")] reply_key_storage: ReplyKeyStorage,
lane_queue_lengths: LaneQueueLengths,
closed_connection_rx: ClosedConnectionReceiver,
) -> Self {
let rng = OsRng;
@@ -161,6 +165,7 @@ impl RealMessagesController<OsRng> {
rng,
config.self_recipient,
topology_access,
lane_queue_lengths,
closed_connection_rx,
);
@@ -4,7 +4,9 @@
use crate::client::mix_traffic::BatchMixMessageSender;
use crate::client::real_messages_control::acknowledgement_control::SentPacketNotificationSender;
use crate::client::topology_control::TopologyAccessor;
use client_connections::{ClosedConnectionReceiver, ConnectionId, TransmissionLane};
use client_connections::{
ClosedConnectionReceiver, ConnectionId, LaneQueueLengths, TransmissionLane,
};
use futures::channel::mpsc;
use futures::task::{Context, Poll};
use futures::{Future, Stream, StreamExt};
@@ -134,6 +136,9 @@ where
/// Incoming channel for being notified of closed connections, so that we can close lanes
/// corresponding to connections. To avoid sending traffic unnecessary
closed_connection_rx: ClosedConnectionReceiver,
/// Report queue lengths so that upstream can backoff sending data, and keep connections open.
lane_queue_lengths: LaneQueueLengths,
}
pub(crate) struct RealMessage {
@@ -177,6 +182,7 @@ where
rng: R,
our_full_destination: Recipient,
topology_access: TopologyAccessor,
lane_queue_lengths: LaneQueueLengths,
closed_connection_rx: ClosedConnectionReceiver,
) -> Self {
OutQueueControl {
@@ -192,9 +198,14 @@ where
topology_access,
transmission_buffer: Default::default(),
closed_connection_rx,
lane_queue_lengths,
}
}
//pub fn get_lane_queue_length(&self) -> &LaneQueueLength {
// &self.lane_queue_length
//}
fn sent_notify(&self, frag_id: FragmentIdentifier) {
// well technically the message was not sent just yet, but now it's up to internal
// queues and client load rather than the required delay. So realistically we can treat
@@ -272,9 +283,9 @@ where
}
fn on_close_connection(&mut self, connection_id: ConnectionId) {
log::debug!("Removing lane for connection: {connection_id}");
self.transmission_buffer
.remove(&TransmissionLane::ConnectionId(connection_id));
//log::debug!("Removing lane for connection: {connection_id}");
//self.transmission_buffer
//.remove(&TransmissionLane::ConnectionId(connection_id));
}
fn current_average_message_sending_delay(&self) -> Duration {
@@ -309,6 +320,17 @@ where
}
}
fn pop_next_message(&mut self) -> Option<RealMessage> {
// Pop the next message from the transmission buffer
let (lane, real_next) = self.transmission_buffer.pop_next_message_at_random()?;
// Update the published queue length
let lane_length = self.transmission_buffer.lane_length(&lane);
self.lane_queue_lengths.set(&lane, lane_length);
Some(real_next)
}
fn poll_poisson(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
// The average delay could change depending on if backpressure in the downstream channel
// (mix_tx) was detected.
@@ -359,16 +381,13 @@ where
log::trace!("handling real_messages: size: {}", real_messages.len());
self.transmission_buffer.store(&conn_id, real_messages);
let real_next = self
.transmission_buffer
.pop_next_message_at_random()
.expect("we just added one");
let real_next = self.pop_next_message().expect("Just stored one");
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
}
Poll::Pending => {
if let Some(real_next) = self.transmission_buffer.pop_next_message_at_random() {
if let Some(real_next) = self.pop_next_message() {
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
} else {
// otherwise construct a dummy one
@@ -411,16 +430,13 @@ where
// First store what we got for the given connection id
self.transmission_buffer.store(&conn_id, real_messages);
let real_next = self
.transmission_buffer
.pop_next_message_at_random()
.expect("we just added one");
let real_next = self.pop_next_message().expect("we just added one");
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
}
Poll::Pending => {
if let Some(real_next) = self.transmission_buffer.pop_next_message_at_random() {
if let Some(real_next) = self.pop_next_message() {
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
} else {
Poll::Pending
@@ -41,6 +41,10 @@ impl TransmissionBuffer {
self.buffer.keys().count()
}
pub(crate) fn lane_length(&self, lane: &TransmissionLane) -> Option<usize> {
self.buffer.get(lane).map(LaneBufferEntry::len)
}
#[allow(unused)]
pub(crate) fn connections(&self) -> HashSet<u64> {
self.buffer
@@ -127,7 +131,7 @@ impl TransmissionBuffer {
Some(real_next)
}
pub(crate) fn pop_next_message_at_random(&mut self) -> Option<RealMessage> {
pub(crate) fn pop_next_message_at_random(&mut self) -> Option<(TransmissionLane, RealMessage)> {
if self.buffer.is_empty() {
return None;
}
@@ -142,8 +146,9 @@ impl TransmissionBuffer {
*self.pick_random_lane()?
};
let msg = self.pop_front_from_lane(&lane)?;
log::trace!("picking to send from lane: {:?}", lane);
self.pop_front_from_lane(&lane)
Some((lane, msg))
}
pub(crate) fn prune_stale_connections(&mut self) {
+13 -1
View File
@@ -1,7 +1,9 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use client_connections::{ClosedConnectionReceiver, ClosedConnectionSender, TransmissionLane};
use client_connections::{
ClosedConnectionReceiver, ClosedConnectionSender, LaneQueueLengths, TransmissionLane,
};
use client_core::client::cover_traffic_stream::LoopCoverTrafficStream;
use client_core::client::inbound_messages::{
InputMessage, InputMessageReceiver, InputMessageSender,
@@ -119,6 +121,7 @@ impl NymClient {
ack_receiver: AcknowledgementReceiver,
input_receiver: InputMessageReceiver,
mix_sender: BatchMixMessageSender,
lane_queue_lengths: LaneQueueLengths,
closed_connection_rx: ClosedConnectionReceiver,
shutdown: ShutdownListener,
) {
@@ -149,6 +152,7 @@ impl NymClient {
mix_sender,
topology_accessor,
reply_key_storage,
lane_queue_lengths,
closed_connection_rx,
)
.start_with_shutdown(shutdown);
@@ -283,6 +287,7 @@ impl NymClient {
&self,
buffer_requester: ReceivedBufferRequestSender,
msg_input: InputMessageSender,
lane_queue_lengths: LaneQueueLengths,
closed_connection_tx: ClosedConnectionSender,
) {
info!("Starting websocket listener...");
@@ -292,6 +297,7 @@ impl NymClient {
closed_connection_tx,
buffer_requester,
&self.as_mix_recipient(),
lane_queue_lengths,
);
websocket::Listener::new(self.config.get_listening_port()).start(websocket_handler);
@@ -417,12 +423,17 @@ impl NymClient {
// controller that connections are closed.
let (closed_connection_tx, closed_connection_rx) = mpsc::unbounded();
// Shared queue length data. Published by the `OutQueueController` in the client, and used
// primarily to throttle incoming connections (e.g socks5 for attached network-requesters)
let shared_lane_queue_lengths = LaneQueueLengths::new();
self.start_real_traffic_controller(
shared_topology_accessor.clone(),
reply_key_storage,
ack_receiver,
input_receiver,
sphinx_message_sender.clone(),
shared_lane_queue_lengths.clone(),
closed_connection_rx,
shutdown.subscribe(),
);
@@ -443,6 +454,7 @@ impl NymClient {
SocketType::WebSocket => self.start_websocket_listener(
received_buffer_request_sender,
input_sender,
shared_lane_queue_lengths,
closed_connection_tx,
),
SocketType::None => {
+13 -2
View File
@@ -1,7 +1,7 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use client_connections::{ClosedConnectionSender, TransmissionLane};
use client_connections::{ClosedConnectionSender, LaneQueueLengths, TransmissionLane};
use client_core::client::{
inbound_messages::{InputMessage, InputMessageSender},
received_buffer::{
@@ -40,6 +40,7 @@ pub(crate) struct Handler {
self_full_address: Recipient,
socket: Option<WebSocketStream<TcpStream>>,
received_response_type: ReceivedResponseType,
lane_queue_lengths: LaneQueueLengths,
}
// clone is used to use handler on a new connection, which initially is `None`
@@ -52,6 +53,7 @@ impl Clone for Handler {
self_full_address: self.self_full_address,
socket: None,
received_response_type: Default::default(),
lane_queue_lengths: self.lane_queue_lengths.clone(),
}
}
}
@@ -70,6 +72,7 @@ impl Handler {
closed_connection_tx: ClosedConnectionSender,
buffer_requester: ReceivedBufferRequestSender,
self_full_address: &Recipient,
lane_queue_lengths: LaneQueueLengths,
) -> Self {
Handler {
msg_input,
@@ -78,6 +81,7 @@ impl Handler {
self_full_address: *self_full_address,
socket: None,
received_response_type: Default::default(),
lane_queue_lengths,
}
}
@@ -93,7 +97,14 @@ impl Handler {
let input_msg = InputMessage::new_fresh(*recipient, message, with_reply_surb, lane);
self.msg_input.unbounded_send(input_msg).unwrap();
None
// WIP(JON)
let queue_lengh = self
.lane_queue_lengths
.lock()
.unwrap()
.get(&lane)
.unwrap_or(0);
Some(ServerResponse::LaneQueueLength(connection_id, queue_lengh))
}
fn handle_reply(&mut self, reply_surb: ReplySurb, message: Vec<u8>) -> Option<ServerResponse> {
+11 -1
View File
@@ -9,7 +9,7 @@ use crate::socks::{
authentication::{AuthenticationMethods, Authenticator, User},
server::SphinxSocksServer,
};
use client_connections::{ClosedConnectionReceiver, ClosedConnectionSender};
use client_connections::{ClosedConnectionReceiver, ClosedConnectionSender, LaneQueueLengths};
use client_core::client::cover_traffic_stream::LoopCoverTrafficStream;
use client_core::client::inbound_messages::{
InputMessage, InputMessageReceiver, InputMessageSender,
@@ -120,6 +120,7 @@ impl NymClient {
input_receiver: InputMessageReceiver,
mix_sender: BatchMixMessageSender,
closed_connection_rx: ClosedConnectionReceiver,
lane_queue_lengths: LaneQueueLengths,
shutdown: ShutdownListener,
) {
let mut controller_config = client_core::client::real_messages_control::Config::new(
@@ -149,6 +150,7 @@ impl NymClient {
mix_sender,
topology_accessor,
reply_key_storage,
lane_queue_lengths,
closed_connection_rx,
)
.start_with_shutdown(shutdown);
@@ -284,6 +286,7 @@ impl NymClient {
buffer_requester: ReceivedBufferRequestSender,
msg_input: InputMessageSender,
closed_connection_tx: ClosedConnectionSender,
lane_queue_lengths: LaneQueueLengths,
shutdown: ShutdownListener,
) {
info!("Starting socks5 listener...");
@@ -296,6 +299,7 @@ impl NymClient {
authenticator,
self.config.get_provider_mix_address(),
self.as_mix_recipient(),
lane_queue_lengths,
shutdown,
);
tokio::spawn(async move {
@@ -409,6 +413,10 @@ impl NymClient {
// This will be forwarded to `OutQueueControl`
let (closed_connection_tx, closed_connection_rx) = mpsc::unbounded();
// Shared queue length data. Published by the `OutQueueController` in the client, and used
// primarily to throttle incoming connections
let shared_lane_queue_lengths = LaneQueueLengths::new();
self.start_real_traffic_controller(
shared_topology_accessor.clone(),
reply_key_storage,
@@ -416,6 +424,7 @@ impl NymClient {
input_receiver,
sphinx_message_sender.clone(),
closed_connection_rx,
shared_lane_queue_lengths.clone(),
shutdown.subscribe(),
);
@@ -435,6 +444,7 @@ impl NymClient {
received_buffer_request_sender,
input_sender,
closed_connection_tx,
shared_lane_queue_lengths,
shutdown.subscribe(),
);
+49 -1
View File
@@ -4,7 +4,7 @@ use super::authentication::{AuthenticationMethods, Authenticator, User};
use super::request::{SocksCommand, SocksRequest};
use super::types::{ResponseCode, SocksProxyError};
use super::{RESERVED, SOCKS_VERSION};
use client_connections::TransmissionLane;
use client_connections::{LaneQueueLengths, TransmissionLane};
use client_core::client::inbound_messages::{InputMessage, InputMessageSender};
use futures::channel::mpsc;
use futures::task::{Context, Poll};
@@ -20,6 +20,7 @@ use socks5_requests::{ConnectionId, Message, RemoteAddress, Request};
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use task::ShutdownListener;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio::{self, net::TcpStream};
@@ -141,7 +142,9 @@ pub(crate) struct SocksClient {
service_provider: Recipient,
self_address: Recipient,
started_proxy: bool,
lane_queue_lengths: LaneQueueLengths,
shutdown_listener: ShutdownListener,
active_connections: Arc<std::sync::Mutex<u64>>,
}
impl Drop for SocksClient {
@@ -165,9 +168,17 @@ impl SocksClient {
service_provider: Recipient,
controller_sender: ControllerSender,
self_address: Recipient,
lane_queue_lengths: LaneQueueLengths,
shutdown_listener: ShutdownListener,
active_connections: Arc<std::sync::Mutex<u64>>,
) -> Self {
let connection_id = Self::generate_random();
let u_active_connections = {
let g = active_connections.lock().unwrap();
*g
};
error!("client active_connections: {}", u_active_connections);
SocksClient {
controller_sender,
connection_id,
@@ -179,7 +190,9 @@ impl SocksClient {
service_provider,
self_address,
started_proxy: false,
lane_queue_lengths,
shutdown_listener,
active_connections,
}
}
@@ -205,6 +218,8 @@ impl SocksClient {
/// is in use and that the client is authenticated, then runs the request.
pub async fn run(&mut self) -> Result<(), SocksProxyError> {
debug!("New connection from: {}", self.stream.peer_addr()?.ip());
//dbg!(&self.stream.peer_addr());
let mut header = [0u8; 2];
// Read a byte from the stream and determine the version being requested
self.stream.read_exact(&mut header).await?;
@@ -258,6 +273,7 @@ impl SocksClient {
conn_receiver,
input_sender,
connection_id,
self.lane_queue_lengths.clone(),
self.shutdown_listener.clone(),
)
.run(move |conn_id, read_data, socket_closed| {
@@ -275,10 +291,24 @@ impl SocksClient {
/// Handles a client request.
async fn handle_request(&mut self) -> Result<(), SocksProxyError> {
debug!("Handling CONNECT Command");
let active_connections = {
let g = self.active_connections.lock().unwrap();
*g
};
error!("active_connections: {}", active_connections);
let request = SocksRequest::from_stream(&mut self.stream).await?;
let remote_address = request.to_string();
if active_connections > 50 {
log::warn!(
"Refusing SOCKS5: too many connections: {}",
active_connections
);
self.refuse_connection_socks5().await;
return Ok(());
}
// setup for receiving from the mixnet
let (mix_sender, mix_receiver) = mpsc::unbounded();
@@ -332,6 +362,24 @@ impl SocksClient {
.unwrap();
}
async fn refuse_connection_socks5(&mut self) {
self.stream
.write_all(&[
SOCKS_VERSION,
ResponseCode::RuleFailure as u8,
RESERVED,
1,
127,
0,
0,
1,
0,
0,
])
.await
.unwrap();
}
/// Authenticate the incoming request. Each request is checked for its
/// authentication method. A user/password request will extract the
/// username and password from the stream, then check with the Authenticator
+18 -1
View File
@@ -4,7 +4,7 @@ use super::{
mixnet_responses::MixnetResponseListener,
types::{ResponseCode, SocksProxyError},
};
use client_connections::ClosedConnectionSender;
use client_connections::{ClosedConnectionSender, LaneQueueLengths};
use client_core::client::{
inbound_messages::InputMessageSender, received_buffer::ReceivedBufferRequestSender,
};
@@ -12,6 +12,7 @@ use log::*;
use nymsphinx::addressing::clients::Recipient;
use proxy_helpers::connection_controller::Controller;
use std::net::SocketAddr;
use std::sync::Arc;
use task::ShutdownListener;
use tokio::net::TcpListener;
@@ -21,6 +22,7 @@ pub struct SphinxSocksServer {
listening_address: SocketAddr,
service_provider: Recipient,
self_address: Recipient,
lane_queue_lengths: LaneQueueLengths,
shutdown: ShutdownListener,
}
@@ -31,6 +33,7 @@ impl SphinxSocksServer {
authenticator: Authenticator,
service_provider: Recipient,
self_address: Recipient,
lane_queue_lengths: LaneQueueLengths,
shutdown: ShutdownListener,
) -> Self {
// hardcode ip as we (presumably) ONLY want to listen locally. If we change it, we can
@@ -42,6 +45,7 @@ impl SphinxSocksServer {
listening_address: format!("{}:{}", ip, port).parse().unwrap(),
service_provider,
self_address,
lane_queue_lengths,
shutdown,
}
}
@@ -74,10 +78,16 @@ impl SphinxSocksServer {
mixnet_response_listener.run().await;
});
let active_connections = Arc::new(std::sync::Mutex::new(0));
loop {
tokio::select! {
Ok((stream, _remote)) = listener.accept() => {
// TODO Optimize this
{
let mut g = active_connections.lock().unwrap();
*g += 1;
}
let mut client = SocksClient::new(
stream,
self.authenticator.clone(),
@@ -85,9 +95,12 @@ impl SphinxSocksServer {
self.service_provider,
controller_sender.clone(),
self.self_address,
self.lane_queue_lengths.clone(),
self.shutdown.clone(),
active_connections.clone(),
);
let active_connections = active_connections.clone();
tokio::spawn(async move {
{
match client.run().await {
@@ -118,6 +131,10 @@ impl SphinxSocksServer {
};
// client gets dropped here
}
{
let mut g = active_connections.lock().unwrap();
*g -= 1;
}
});
},
_ = self.shutdown.recv() => {
+1
View File
@@ -7,3 +7,4 @@ edition = "2021"
[dependencies]
futures = "0.3"
log = "0.4.17"
+80
View File
@@ -1,6 +1,8 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::collections::HashMap;
use futures::channel::mpsc;
pub type ConnectionId = u64;
@@ -19,3 +21,81 @@ pub enum TransmissionLane {
/// they can forward this to the `OutQueueControl` (via `ClientRequest` for the network-requester)
pub type ClosedConnectionSender = mpsc::UnboundedSender<ConnectionId>;
pub type ClosedConnectionReceiver = mpsc::UnboundedReceiver<ConnectionId>;
// The `OutQueueControl` publishes the backlog per lane, primarily so that upstream can slow down
// if needed.
#[derive(Clone, Debug)]
pub struct LaneQueueLengths(std::sync::Arc<std::sync::Mutex<LaneQueueLengthsInner>>);
impl LaneQueueLengths {
pub fn new() -> Self {
LaneQueueLengths(std::sync::Arc::new(std::sync::Mutex::new(
LaneQueueLengthsInner {
map: HashMap::new(),
},
)))
}
pub fn set(&mut self, lane: &TransmissionLane, lane_length: Option<usize>) {
match self.0.lock() {
Ok(mut inner) => {
if let Some(length) = lane_length {
inner
.map
.entry(*lane)
.and_modify(|e| *e = length)
.or_insert(length);
} else {
inner.map.remove(lane);
}
}
Err(err) => log::warn!("Failed to set lane queue length: {err}"),
}
}
pub fn get(&self, lane: &TransmissionLane) -> Option<usize> {
match self.0.lock() {
Ok(inner) => inner.get(lane),
Err(err) => {
log::warn!("Failed to get lane queue length: {err}");
None
}
}
}
}
impl Default for LaneQueueLengths {
fn default() -> Self {
Self::new()
}
}
impl std::ops::Deref for LaneQueueLengths {
type Target = std::sync::Arc<std::sync::Mutex<LaneQueueLengthsInner>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Debug)]
pub struct LaneQueueLengthsInner {
pub map: HashMap<TransmissionLane, usize>,
}
impl LaneQueueLengthsInner {
pub fn get(&self, lane: &TransmissionLane) -> Option<usize> {
self.map.get(lane).copied()
}
pub fn values(&self) -> impl Iterator<Item = &usize> {
self.map.values()
}
pub fn modify<F>(&mut self, lane: &TransmissionLane, f: F)
where
F: FnOnce(&mut usize),
{
self.map.entry(*lane).and_modify(f);
}
}
@@ -5,11 +5,14 @@ use super::MixProxySender;
use super::SHUTDOWN_TIMEOUT;
use crate::available_reader::AvailableReader;
use bytes::Bytes;
use client_connections::LaneQueueLengths;
use client_connections::TransmissionLane;
use futures::FutureExt;
use futures::StreamExt;
use log::*;
use ordered_buffer::OrderedMessageSender;
use socks5_requests::ConnectionId;
use std::time::Duration;
use std::{io, sync::Arc};
use task::ShutdownListener;
use tokio::select;
@@ -29,7 +32,7 @@ fn send_empty_close<F, S>(
.unwrap();
}
fn deal_with_data<F, S>(
async fn deal_with_data<F, S>(
read_data: Option<io::Result<Bytes>>,
local_destination_address: &str,
remote_source_address: &str,
@@ -37,6 +40,7 @@ fn deal_with_data<F, S>(
message_sender: &mut OrderedMessageSender,
mix_sender: &MixProxySender<S>,
adapter_fn: F,
lane_queue_lengths: LaneQueueLengths,
) -> bool
where
F: Fn(ConnectionId, Vec<u8>, bool) -> S,
@@ -67,6 +71,70 @@ where
"pushing data down the input sender: size: {}",
ordered_msg.len()
);
// wait here until queue is not too long
let lane = TransmissionLane::ConnectionId(connection_id);
//loop {
{
let (queue_length, est_busy_conn) = {
let mut guard = lane_queue_lengths.lock().unwrap();
//let queue_length = *guard.get(&lane).unwrap_or(&0);
//let queue_length = *guard.entry(lane).or_insert(0);
let queue_length = guard.get(&lane).unwrap_or(0);
let est_busy_conn = guard.values().filter(|v| *v > &5).count();
// We estimate the queue length for subsequent data packet. This is needed
// because there is a delay until we get the correct value back as a server
// response. (And that will have a delay baked in).
// WIP(JON): pull packet size from somewhere
let sphinx_size = 2000.0;
let msg_length = (ordered_msg.len() as f64 / sphinx_size).ceil() as usize;
guard.modify(&lane, |length| *length += msg_length);
(queue_length, est_busy_conn)
};
log::info!("conn_id: {connection_id}, queue: {queue_length}");
// The heuristic here is:
// 20ms average delay => 50 packets / sec
// 500 packet queue => 10 sec behind
// This assumes it's the only active connection, and that there is no throttling
// In practive, this is a latency vs throughput tradeoff we're making here
let avererage_delay = 0.02; // TODO: read from config
let packets_per_sec = 1.0 / avererage_delay;
let ideal_time_to_clear_queue = queue_length as f64 / packets_per_sec;
if queue_length > 5000 {
log::info!("sleeping long");
sleep(Duration::from_secs_f64(ideal_time_to_clear_queue * 5.0)).await;
} else if queue_length > 500 {
log::info!("sleeping medium");
sleep(Duration::from_secs_f64(
ideal_time_to_clear_queue * 2.0 / 3.0,
))
.await;
} else if queue_length > 5 {
log::info!("sleeping short");
sleep(Duration::from_secs_f64(ideal_time_to_clear_queue / 3.0)).await;
}
// If we are saturated on number of connections, and this is already a busy
// connection, basically soft-stop
//if est_busy_conn > 15 && queue_length > 5 {
// log::info!("soft-stop: {connection_id}");
// sleep(Duration::from_secs_f64(5.0 * 60.0)).await;
//}
//loop {
// let count = ACTIVE_PROXIES.load(Ordering::Relaxed);
// if count + 1 > 15 {
// log::info!("Max connections reached, parking: {conn_id}");
// sleep(Duration::from_secs(10)).await;
// } else {
// break;
// }
//}
}
mix_sender
.unbounded_send(adapter_fn(connection_id, ordered_msg, is_finished))
.unwrap();
@@ -88,6 +156,7 @@ pub(super) async fn run_inbound<F, S>(
mix_sender: MixProxySender<S>,
adapter_fn: F,
shutdown_notify: Arc<Notify>,
lane_queue_lengths: LaneQueueLengths,
mut shutdown_listener: ShutdownListener,
) -> OwnedReadHalf
where
@@ -102,12 +171,24 @@ where
loop {
select! {
read_data = &mut available_reader.next() => {
if deal_with_data(read_data, &local_destination_address, &remote_source_address, connection_id, &mut message_sender, &mix_sender, &adapter_fn) {
if deal_with_data(
read_data,
&local_destination_address,
&remote_source_address,
connection_id,
&mut message_sender,
&mix_sender,
&adapter_fn,
lane_queue_lengths.clone()
).await {
break
}
}
_ = &mut shutdown_future => {
debug!("closing inbound proxy after outbound was closed {:?} ago", SHUTDOWN_TIMEOUT);
debug!(
"closing inbound proxy after outbound was closed {:?} ago",
SHUTDOWN_TIMEOUT
);
// inform remote just in case it was closed because of lack of heartbeat.
// worst case the remote will just have couple of false negatives
send_empty_close(connection_id, &mut message_sender, &mix_sender, &adapter_fn);
@@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::connection_controller::ConnectionReceiver;
use client_connections::LaneQueueLengths;
use futures::channel::mpsc;
use socks5_requests::ConnectionId;
use std::{sync::Arc, time::Duration};
@@ -12,7 +13,7 @@ mod inbound;
mod outbound;
// TODO: make this configurable
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30);
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(300);
#[derive(Debug)]
pub struct ProxyMessage {
@@ -45,6 +46,7 @@ pub struct ProxyRunner<S> {
local_destination_address: String,
remote_source_address: String,
connection_id: ConnectionId,
lane_queue_lengths: LaneQueueLengths,
// Listens to shutdown commands from higher up
shutdown_listener: ShutdownListener,
@@ -61,6 +63,7 @@ where
mix_receiver: ConnectionReceiver,
mix_sender: MixProxySender<S>,
connection_id: ConnectionId,
lane_queue_lengths: LaneQueueLengths,
shutdown_listener: ShutdownListener,
) -> Self {
ProxyRunner {
@@ -70,6 +73,7 @@ where
local_destination_address,
remote_source_address,
connection_id,
lane_queue_lengths,
shutdown_listener,
}
}
@@ -78,7 +82,7 @@ where
// request/response as required by entity running particular side of the proxy.
pub async fn run<F>(mut self, adapter_fn: F) -> Self
where
F: Fn(ConnectionId, Vec<u8>, bool) -> S + Send + 'static,
F: Fn(ConnectionId, Vec<u8>, bool) -> S + Send + Sync + 'static,
{
let (read_half, write_half) = self.socket.take().unwrap().into_split();
let shutdown_notify = Arc::new(Notify::new());
@@ -92,6 +96,7 @@ where
self.mix_sender.clone(),
adapter_fn,
Arc::clone(&shutdown_notify),
self.lane_queue_lengths.clone(),
self.shutdown_listener.clone(),
);
@@ -14,6 +14,7 @@ use tokio::select;
use tokio::{net::tcp::OwnedWriteHalf, sync::Notify, time::sleep, time::Instant};
const MIX_TTL: Duration = Duration::from_secs(5 * 60);
//const MIX_TTL: Duration = Duration::from_secs(15 * 60);
async fn deal_with_message(
connection_message: ConnectionMessage,
@@ -1,6 +1,7 @@
// Copyright 2020 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use client_connections::LaneQueueLengths;
use futures::channel::mpsc;
use nymsphinx::addressing::clients::Recipient;
use proxy_helpers::connection_controller::ConnectionReceiver;
@@ -41,6 +42,7 @@ impl Connection {
&mut self,
mix_receiver: ConnectionReceiver,
mix_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
lane_queue_lengths: LaneQueueLengths,
shutdown: ShutdownListener,
) {
let stream = self.conn.take().unwrap();
@@ -54,6 +56,7 @@ impl Connection {
mix_receiver,
mix_sender,
connection_id,
lane_queue_lengths,
shutdown,
)
.run(move |conn_id, read_data, socket_closed| {
@@ -7,7 +7,7 @@ use crate::error::NetworkRequesterError;
use crate::statistics::ServiceStatisticsCollector;
use crate::websocket;
use crate::websocket::TSWebsocketStream;
use client_connections::ClosedConnectionReceiver;
use client_connections::{ClosedConnectionReceiver, LaneQueueLengths, TransmissionLane};
use futures::channel::mpsc;
use futures::stream::{SplitSink, SplitStream};
use futures::{SinkExt, StreamExt};
@@ -118,6 +118,7 @@ impl ServiceProvider {
async fn read_websocket_message(
websocket_reader: &mut SplitStream<TSWebsocketStream>,
lane_queue_lengths: LaneQueueLengths,
) -> Option<ReconstructedMessage> {
while let Some(msg) = websocket_reader.next().await {
let data = msg
@@ -138,6 +139,15 @@ impl ServiceProvider {
let received = match deserialized_message {
ServerResponse::Received(received) => received,
ServerResponse::LaneQueueLength(lane, queue_length) => {
log::info!(
"received LaneQueueLength lane: {lane}, queue_length: {queue_length}"
);
let lane = TransmissionLane::ConnectionId(lane);
let mut guard = lane_queue_lengths.lock().unwrap();
guard.map.insert(lane, queue_length);
continue;
}
ServerResponse::Error(err) => {
panic!("received error from native client! - {}", err)
}
@@ -154,6 +164,7 @@ impl ServiceProvider {
return_address: Recipient,
controller_sender: ControllerSender,
mix_input_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
lane_queue_lengths: LaneQueueLengths,
shutdown: ShutdownListener,
) {
let mut conn = match Connection::new(conn_id, remote_addr.clone(), return_address).await {
@@ -191,7 +202,7 @@ impl ServiceProvider {
);
// run the proxy on the connection
conn.run_proxy(mix_receiver, mix_input_sender, shutdown)
conn.run_proxy(mix_receiver, mix_input_sender, lane_queue_lengths, shutdown)
.await;
// proxy is done - remove the access channel from the controller
@@ -207,6 +218,7 @@ impl ServiceProvider {
);
}
#[allow(clippy::too_many_arguments)]
fn handle_proxy_connect(
&mut self,
controller_sender: &mut ControllerSender,
@@ -214,6 +226,7 @@ impl ServiceProvider {
conn_id: ConnectionId,
remote_addr: String,
return_address: Recipient,
lane_queue_lengths: LaneQueueLengths,
shutdown: ShutdownListener,
) {
if !self.open_proxy && !self.outbound_request_filter.check(&remote_addr) {
@@ -241,6 +254,7 @@ impl ServiceProvider {
return_address,
controller_sender_clone,
mix_input_sender_clone,
lane_queue_lengths,
shutdown,
)
.await
@@ -265,6 +279,7 @@ impl ServiceProvider {
controller_sender: &mut ControllerSender,
mix_input_sender: &mpsc::UnboundedSender<(Socks5Message, Recipient)>,
stats_collector: Option<ServiceStatisticsCollector>,
lane_queue_lengths: LaneQueueLengths,
shutdown: ShutdownListener,
) {
let deserialized_msg = match Socks5Message::try_from_bytes(raw_request) {
@@ -290,6 +305,7 @@ impl ServiceProvider {
req.conn_id,
req.remote_addr,
req.return_address,
lane_queue_lengths,
shutdown,
)
}
@@ -336,6 +352,10 @@ impl ServiceProvider {
// `ClientRequest`.
let (closed_connection_tx, closed_connection_rx) = mpsc::unbounded();
// Shared queue length data. Published by the `OutQueueController` in the client, and used
// primarily to throttle incoming connections
let shared_lane_queue_lengths = LaneQueueLengths::new();
// Controller for managing all active connections.
// We provide it with a ShutdownListener since it requires it, even though for the network
// requester shutdown signalling is not yet fully implemented.
@@ -376,7 +396,12 @@ impl ServiceProvider {
println!("\nAll systems go. Press CTRL-C to stop the server.");
// for each incoming message from the websocket... (which in 99.99% cases is going to be a mix message)
loop {
let received = match Self::read_websocket_message(&mut websocket_reader).await {
let received = match Self::read_websocket_message(
&mut websocket_reader,
shared_lane_queue_lengths.clone(),
)
.await
{
Some(msg) => msg,
None => {
error!("The websocket stream has finished!");
@@ -392,6 +417,7 @@ impl ServiceProvider {
&mut controller_sender,
&mix_input_sender,
stats_collector.clone(),
shared_lane_queue_lengths.clone(),
shutdown.subscribe(),
)
.await;