Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d21cbc9746 | |||
| d4d739e267 | |||
| 433214a9d8 | |||
| fb74d243d5 | |||
| ee54efe54b |
Generated
+1
@@ -581,6 +581,7 @@ name = "client-connections"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -8,13 +8,15 @@
|
||||
use self::{
|
||||
acknowledgement_control::AcknowledgementController, real_traffic_stream::OutQueueControl,
|
||||
};
|
||||
use crate::client::real_messages_control::acknowledgement_control::AcknowledgementControllerConnectors;
|
||||
use crate::client::{
|
||||
inbound_messages::InputMessageReceiver, mix_traffic::BatchMixMessageSender,
|
||||
topology_control::TopologyAccessor,
|
||||
use crate::{
|
||||
client::{
|
||||
inbound_messages::InputMessageReceiver, mix_traffic::BatchMixMessageSender,
|
||||
real_messages_control::acknowledgement_control::AcknowledgementControllerConnectors,
|
||||
topology_control::TopologyAccessor,
|
||||
},
|
||||
spawn_future,
|
||||
};
|
||||
use crate::spawn_future;
|
||||
use client_connections::ClosedConnectionReceiver;
|
||||
use client_connections::{ClosedConnectionReceiver, LaneQueueLengths};
|
||||
use futures::channel::mpsc;
|
||||
use gateway_client::AcknowledgementReceiver;
|
||||
use log::*;
|
||||
@@ -104,6 +106,7 @@ where
|
||||
// obviously when we finally make shared rng that is on 'higher' level, this should become
|
||||
// generic `R`
|
||||
impl RealMessagesController<OsRng> {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
config: Config,
|
||||
ack_receiver: AcknowledgementReceiver,
|
||||
@@ -111,6 +114,7 @@ impl RealMessagesController<OsRng> {
|
||||
mix_sender: BatchMixMessageSender,
|
||||
topology_access: TopologyAccessor,
|
||||
#[cfg(feature = "reply-surb")] reply_key_storage: ReplyKeyStorage,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
closed_connection_rx: ClosedConnectionReceiver,
|
||||
) -> Self {
|
||||
let rng = OsRng;
|
||||
@@ -161,6 +165,7 @@ impl RealMessagesController<OsRng> {
|
||||
rng,
|
||||
config.self_recipient,
|
||||
topology_access,
|
||||
lane_queue_lengths,
|
||||
closed_connection_rx,
|
||||
);
|
||||
|
||||
|
||||
@@ -4,7 +4,9 @@
|
||||
use crate::client::mix_traffic::BatchMixMessageSender;
|
||||
use crate::client::real_messages_control::acknowledgement_control::SentPacketNotificationSender;
|
||||
use crate::client::topology_control::TopologyAccessor;
|
||||
use client_connections::{ClosedConnectionReceiver, ConnectionId, TransmissionLane};
|
||||
use client_connections::{
|
||||
ClosedConnectionReceiver, ConnectionId, LaneQueueLengths, TransmissionLane,
|
||||
};
|
||||
use futures::channel::mpsc;
|
||||
use futures::task::{Context, Poll};
|
||||
use futures::{Future, Stream, StreamExt};
|
||||
@@ -134,6 +136,9 @@ where
|
||||
/// Incoming channel for being notified of closed connections, so that we can close lanes
|
||||
/// corresponding to connections. To avoid sending traffic unnecessary
|
||||
closed_connection_rx: ClosedConnectionReceiver,
|
||||
|
||||
/// Report queue lengths so that upstream can backoff sending data, and keep connections open.
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
}
|
||||
|
||||
pub(crate) struct RealMessage {
|
||||
@@ -177,6 +182,7 @@ where
|
||||
rng: R,
|
||||
our_full_destination: Recipient,
|
||||
topology_access: TopologyAccessor,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
closed_connection_rx: ClosedConnectionReceiver,
|
||||
) -> Self {
|
||||
OutQueueControl {
|
||||
@@ -192,9 +198,14 @@ where
|
||||
topology_access,
|
||||
transmission_buffer: Default::default(),
|
||||
closed_connection_rx,
|
||||
lane_queue_lengths,
|
||||
}
|
||||
}
|
||||
|
||||
//pub fn get_lane_queue_length(&self) -> &LaneQueueLength {
|
||||
// &self.lane_queue_length
|
||||
//}
|
||||
|
||||
fn sent_notify(&self, frag_id: FragmentIdentifier) {
|
||||
// well technically the message was not sent just yet, but now it's up to internal
|
||||
// queues and client load rather than the required delay. So realistically we can treat
|
||||
@@ -272,9 +283,9 @@ where
|
||||
}
|
||||
|
||||
fn on_close_connection(&mut self, connection_id: ConnectionId) {
|
||||
log::debug!("Removing lane for connection: {connection_id}");
|
||||
self.transmission_buffer
|
||||
.remove(&TransmissionLane::ConnectionId(connection_id));
|
||||
//log::debug!("Removing lane for connection: {connection_id}");
|
||||
//self.transmission_buffer
|
||||
//.remove(&TransmissionLane::ConnectionId(connection_id));
|
||||
}
|
||||
|
||||
fn current_average_message_sending_delay(&self) -> Duration {
|
||||
@@ -309,6 +320,17 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn pop_next_message(&mut self) -> Option<RealMessage> {
|
||||
// Pop the next message from the transmission buffer
|
||||
let (lane, real_next) = self.transmission_buffer.pop_next_message_at_random()?;
|
||||
|
||||
// Update the published queue length
|
||||
let lane_length = self.transmission_buffer.lane_length(&lane);
|
||||
self.lane_queue_lengths.set(&lane, lane_length);
|
||||
|
||||
Some(real_next)
|
||||
}
|
||||
|
||||
fn poll_poisson(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
|
||||
// The average delay could change depending on if backpressure in the downstream channel
|
||||
// (mix_tx) was detected.
|
||||
@@ -359,16 +381,13 @@ where
|
||||
log::trace!("handling real_messages: size: {}", real_messages.len());
|
||||
|
||||
self.transmission_buffer.store(&conn_id, real_messages);
|
||||
let real_next = self
|
||||
.transmission_buffer
|
||||
.pop_next_message_at_random()
|
||||
.expect("we just added one");
|
||||
let real_next = self.pop_next_message().expect("Just stored one");
|
||||
|
||||
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
|
||||
}
|
||||
|
||||
Poll::Pending => {
|
||||
if let Some(real_next) = self.transmission_buffer.pop_next_message_at_random() {
|
||||
if let Some(real_next) = self.pop_next_message() {
|
||||
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
|
||||
} else {
|
||||
// otherwise construct a dummy one
|
||||
@@ -411,16 +430,13 @@ where
|
||||
|
||||
// First store what we got for the given connection id
|
||||
self.transmission_buffer.store(&conn_id, real_messages);
|
||||
let real_next = self
|
||||
.transmission_buffer
|
||||
.pop_next_message_at_random()
|
||||
.expect("we just added one");
|
||||
let real_next = self.pop_next_message().expect("we just added one");
|
||||
|
||||
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
|
||||
}
|
||||
|
||||
Poll::Pending => {
|
||||
if let Some(real_next) = self.transmission_buffer.pop_next_message_at_random() {
|
||||
if let Some(real_next) = self.pop_next_message() {
|
||||
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
|
||||
} else {
|
||||
Poll::Pending
|
||||
|
||||
+7
-2
@@ -41,6 +41,10 @@ impl TransmissionBuffer {
|
||||
self.buffer.keys().count()
|
||||
}
|
||||
|
||||
pub(crate) fn lane_length(&self, lane: &TransmissionLane) -> Option<usize> {
|
||||
self.buffer.get(lane).map(LaneBufferEntry::len)
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) fn connections(&self) -> HashSet<u64> {
|
||||
self.buffer
|
||||
@@ -127,7 +131,7 @@ impl TransmissionBuffer {
|
||||
Some(real_next)
|
||||
}
|
||||
|
||||
pub(crate) fn pop_next_message_at_random(&mut self) -> Option<RealMessage> {
|
||||
pub(crate) fn pop_next_message_at_random(&mut self) -> Option<(TransmissionLane, RealMessage)> {
|
||||
if self.buffer.is_empty() {
|
||||
return None;
|
||||
}
|
||||
@@ -142,8 +146,9 @@ impl TransmissionBuffer {
|
||||
*self.pick_random_lane()?
|
||||
};
|
||||
|
||||
let msg = self.pop_front_from_lane(&lane)?;
|
||||
log::trace!("picking to send from lane: {:?}", lane);
|
||||
self.pop_front_from_lane(&lane)
|
||||
Some((lane, msg))
|
||||
}
|
||||
|
||||
pub(crate) fn prune_stale_connections(&mut self) {
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use client_connections::{ClosedConnectionReceiver, ClosedConnectionSender, TransmissionLane};
|
||||
use client_connections::{
|
||||
ClosedConnectionReceiver, ClosedConnectionSender, LaneQueueLengths, TransmissionLane,
|
||||
};
|
||||
use client_core::client::cover_traffic_stream::LoopCoverTrafficStream;
|
||||
use client_core::client::inbound_messages::{
|
||||
InputMessage, InputMessageReceiver, InputMessageSender,
|
||||
@@ -119,6 +121,7 @@ impl NymClient {
|
||||
ack_receiver: AcknowledgementReceiver,
|
||||
input_receiver: InputMessageReceiver,
|
||||
mix_sender: BatchMixMessageSender,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
closed_connection_rx: ClosedConnectionReceiver,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
@@ -149,6 +152,7 @@ impl NymClient {
|
||||
mix_sender,
|
||||
topology_accessor,
|
||||
reply_key_storage,
|
||||
lane_queue_lengths,
|
||||
closed_connection_rx,
|
||||
)
|
||||
.start_with_shutdown(shutdown);
|
||||
@@ -283,6 +287,7 @@ impl NymClient {
|
||||
&self,
|
||||
buffer_requester: ReceivedBufferRequestSender,
|
||||
msg_input: InputMessageSender,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
closed_connection_tx: ClosedConnectionSender,
|
||||
) {
|
||||
info!("Starting websocket listener...");
|
||||
@@ -292,6 +297,7 @@ impl NymClient {
|
||||
closed_connection_tx,
|
||||
buffer_requester,
|
||||
&self.as_mix_recipient(),
|
||||
lane_queue_lengths,
|
||||
);
|
||||
|
||||
websocket::Listener::new(self.config.get_listening_port()).start(websocket_handler);
|
||||
@@ -417,12 +423,17 @@ impl NymClient {
|
||||
// controller that connections are closed.
|
||||
let (closed_connection_tx, closed_connection_rx) = mpsc::unbounded();
|
||||
|
||||
// Shared queue length data. Published by the `OutQueueController` in the client, and used
|
||||
// primarily to throttle incoming connections (e.g socks5 for attached network-requesters)
|
||||
let shared_lane_queue_lengths = LaneQueueLengths::new();
|
||||
|
||||
self.start_real_traffic_controller(
|
||||
shared_topology_accessor.clone(),
|
||||
reply_key_storage,
|
||||
ack_receiver,
|
||||
input_receiver,
|
||||
sphinx_message_sender.clone(),
|
||||
shared_lane_queue_lengths.clone(),
|
||||
closed_connection_rx,
|
||||
shutdown.subscribe(),
|
||||
);
|
||||
@@ -443,6 +454,7 @@ impl NymClient {
|
||||
SocketType::WebSocket => self.start_websocket_listener(
|
||||
received_buffer_request_sender,
|
||||
input_sender,
|
||||
shared_lane_queue_lengths,
|
||||
closed_connection_tx,
|
||||
),
|
||||
SocketType::None => {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use client_connections::{ClosedConnectionSender, TransmissionLane};
|
||||
use client_connections::{ClosedConnectionSender, LaneQueueLengths, TransmissionLane};
|
||||
use client_core::client::{
|
||||
inbound_messages::{InputMessage, InputMessageSender},
|
||||
received_buffer::{
|
||||
@@ -40,6 +40,7 @@ pub(crate) struct Handler {
|
||||
self_full_address: Recipient,
|
||||
socket: Option<WebSocketStream<TcpStream>>,
|
||||
received_response_type: ReceivedResponseType,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
}
|
||||
|
||||
// clone is used to use handler on a new connection, which initially is `None`
|
||||
@@ -52,6 +53,7 @@ impl Clone for Handler {
|
||||
self_full_address: self.self_full_address,
|
||||
socket: None,
|
||||
received_response_type: Default::default(),
|
||||
lane_queue_lengths: self.lane_queue_lengths.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -70,6 +72,7 @@ impl Handler {
|
||||
closed_connection_tx: ClosedConnectionSender,
|
||||
buffer_requester: ReceivedBufferRequestSender,
|
||||
self_full_address: &Recipient,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
) -> Self {
|
||||
Handler {
|
||||
msg_input,
|
||||
@@ -78,6 +81,7 @@ impl Handler {
|
||||
self_full_address: *self_full_address,
|
||||
socket: None,
|
||||
received_response_type: Default::default(),
|
||||
lane_queue_lengths,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -93,7 +97,14 @@ impl Handler {
|
||||
let input_msg = InputMessage::new_fresh(*recipient, message, with_reply_surb, lane);
|
||||
self.msg_input.unbounded_send(input_msg).unwrap();
|
||||
|
||||
None
|
||||
// WIP(JON)
|
||||
let queue_lengh = self
|
||||
.lane_queue_lengths
|
||||
.lock()
|
||||
.unwrap()
|
||||
.get(&lane)
|
||||
.unwrap_or(0);
|
||||
Some(ServerResponse::LaneQueueLength(connection_id, queue_lengh))
|
||||
}
|
||||
|
||||
fn handle_reply(&mut self, reply_surb: ReplySurb, message: Vec<u8>) -> Option<ServerResponse> {
|
||||
|
||||
@@ -9,7 +9,7 @@ use crate::socks::{
|
||||
authentication::{AuthenticationMethods, Authenticator, User},
|
||||
server::SphinxSocksServer,
|
||||
};
|
||||
use client_connections::{ClosedConnectionReceiver, ClosedConnectionSender};
|
||||
use client_connections::{ClosedConnectionReceiver, ClosedConnectionSender, LaneQueueLengths};
|
||||
use client_core::client::cover_traffic_stream::LoopCoverTrafficStream;
|
||||
use client_core::client::inbound_messages::{
|
||||
InputMessage, InputMessageReceiver, InputMessageSender,
|
||||
@@ -120,6 +120,7 @@ impl NymClient {
|
||||
input_receiver: InputMessageReceiver,
|
||||
mix_sender: BatchMixMessageSender,
|
||||
closed_connection_rx: ClosedConnectionReceiver,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
let mut controller_config = client_core::client::real_messages_control::Config::new(
|
||||
@@ -149,6 +150,7 @@ impl NymClient {
|
||||
mix_sender,
|
||||
topology_accessor,
|
||||
reply_key_storage,
|
||||
lane_queue_lengths,
|
||||
closed_connection_rx,
|
||||
)
|
||||
.start_with_shutdown(shutdown);
|
||||
@@ -284,6 +286,7 @@ impl NymClient {
|
||||
buffer_requester: ReceivedBufferRequestSender,
|
||||
msg_input: InputMessageSender,
|
||||
closed_connection_tx: ClosedConnectionSender,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
info!("Starting socks5 listener...");
|
||||
@@ -296,6 +299,7 @@ impl NymClient {
|
||||
authenticator,
|
||||
self.config.get_provider_mix_address(),
|
||||
self.as_mix_recipient(),
|
||||
lane_queue_lengths,
|
||||
shutdown,
|
||||
);
|
||||
tokio::spawn(async move {
|
||||
@@ -409,6 +413,10 @@ impl NymClient {
|
||||
// This will be forwarded to `OutQueueControl`
|
||||
let (closed_connection_tx, closed_connection_rx) = mpsc::unbounded();
|
||||
|
||||
// Shared queue length data. Published by the `OutQueueController` in the client, and used
|
||||
// primarily to throttle incoming connections
|
||||
let shared_lane_queue_lengths = LaneQueueLengths::new();
|
||||
|
||||
self.start_real_traffic_controller(
|
||||
shared_topology_accessor.clone(),
|
||||
reply_key_storage,
|
||||
@@ -416,6 +424,7 @@ impl NymClient {
|
||||
input_receiver,
|
||||
sphinx_message_sender.clone(),
|
||||
closed_connection_rx,
|
||||
shared_lane_queue_lengths.clone(),
|
||||
shutdown.subscribe(),
|
||||
);
|
||||
|
||||
@@ -435,6 +444,7 @@ impl NymClient {
|
||||
received_buffer_request_sender,
|
||||
input_sender,
|
||||
closed_connection_tx,
|
||||
shared_lane_queue_lengths,
|
||||
shutdown.subscribe(),
|
||||
);
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ use super::authentication::{AuthenticationMethods, Authenticator, User};
|
||||
use super::request::{SocksCommand, SocksRequest};
|
||||
use super::types::{ResponseCode, SocksProxyError};
|
||||
use super::{RESERVED, SOCKS_VERSION};
|
||||
use client_connections::TransmissionLane;
|
||||
use client_connections::{LaneQueueLengths, TransmissionLane};
|
||||
use client_core::client::inbound_messages::{InputMessage, InputMessageSender};
|
||||
use futures::channel::mpsc;
|
||||
use futures::task::{Context, Poll};
|
||||
@@ -20,6 +20,7 @@ use socks5_requests::{ConnectionId, Message, RemoteAddress, Request};
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use task::ShutdownListener;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
|
||||
use tokio::{self, net::TcpStream};
|
||||
@@ -141,7 +142,9 @@ pub(crate) struct SocksClient {
|
||||
service_provider: Recipient,
|
||||
self_address: Recipient,
|
||||
started_proxy: bool,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
shutdown_listener: ShutdownListener,
|
||||
active_connections: Arc<std::sync::Mutex<u64>>,
|
||||
}
|
||||
|
||||
impl Drop for SocksClient {
|
||||
@@ -165,9 +168,17 @@ impl SocksClient {
|
||||
service_provider: Recipient,
|
||||
controller_sender: ControllerSender,
|
||||
self_address: Recipient,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
shutdown_listener: ShutdownListener,
|
||||
active_connections: Arc<std::sync::Mutex<u64>>,
|
||||
) -> Self {
|
||||
let connection_id = Self::generate_random();
|
||||
let u_active_connections = {
|
||||
let g = active_connections.lock().unwrap();
|
||||
*g
|
||||
};
|
||||
error!("client active_connections: {}", u_active_connections);
|
||||
|
||||
SocksClient {
|
||||
controller_sender,
|
||||
connection_id,
|
||||
@@ -179,7 +190,9 @@ impl SocksClient {
|
||||
service_provider,
|
||||
self_address,
|
||||
started_proxy: false,
|
||||
lane_queue_lengths,
|
||||
shutdown_listener,
|
||||
active_connections,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -205,6 +218,8 @@ impl SocksClient {
|
||||
/// is in use and that the client is authenticated, then runs the request.
|
||||
pub async fn run(&mut self) -> Result<(), SocksProxyError> {
|
||||
debug!("New connection from: {}", self.stream.peer_addr()?.ip());
|
||||
//dbg!(&self.stream.peer_addr());
|
||||
|
||||
let mut header = [0u8; 2];
|
||||
// Read a byte from the stream and determine the version being requested
|
||||
self.stream.read_exact(&mut header).await?;
|
||||
@@ -258,6 +273,7 @@ impl SocksClient {
|
||||
conn_receiver,
|
||||
input_sender,
|
||||
connection_id,
|
||||
self.lane_queue_lengths.clone(),
|
||||
self.shutdown_listener.clone(),
|
||||
)
|
||||
.run(move |conn_id, read_data, socket_closed| {
|
||||
@@ -275,10 +291,24 @@ impl SocksClient {
|
||||
/// Handles a client request.
|
||||
async fn handle_request(&mut self) -> Result<(), SocksProxyError> {
|
||||
debug!("Handling CONNECT Command");
|
||||
let active_connections = {
|
||||
let g = self.active_connections.lock().unwrap();
|
||||
*g
|
||||
};
|
||||
error!("active_connections: {}", active_connections);
|
||||
|
||||
let request = SocksRequest::from_stream(&mut self.stream).await?;
|
||||
let remote_address = request.to_string();
|
||||
|
||||
if active_connections > 50 {
|
||||
log::warn!(
|
||||
"Refusing SOCKS5: too many connections: {}",
|
||||
active_connections
|
||||
);
|
||||
self.refuse_connection_socks5().await;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// setup for receiving from the mixnet
|
||||
let (mix_sender, mix_receiver) = mpsc::unbounded();
|
||||
|
||||
@@ -332,6 +362,24 @@ impl SocksClient {
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
async fn refuse_connection_socks5(&mut self) {
|
||||
self.stream
|
||||
.write_all(&[
|
||||
SOCKS_VERSION,
|
||||
ResponseCode::RuleFailure as u8,
|
||||
RESERVED,
|
||||
1,
|
||||
127,
|
||||
0,
|
||||
0,
|
||||
1,
|
||||
0,
|
||||
0,
|
||||
])
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
/// Authenticate the incoming request. Each request is checked for its
|
||||
/// authentication method. A user/password request will extract the
|
||||
/// username and password from the stream, then check with the Authenticator
|
||||
|
||||
@@ -4,7 +4,7 @@ use super::{
|
||||
mixnet_responses::MixnetResponseListener,
|
||||
types::{ResponseCode, SocksProxyError},
|
||||
};
|
||||
use client_connections::ClosedConnectionSender;
|
||||
use client_connections::{ClosedConnectionSender, LaneQueueLengths};
|
||||
use client_core::client::{
|
||||
inbound_messages::InputMessageSender, received_buffer::ReceivedBufferRequestSender,
|
||||
};
|
||||
@@ -12,6 +12,7 @@ use log::*;
|
||||
use nymsphinx::addressing::clients::Recipient;
|
||||
use proxy_helpers::connection_controller::Controller;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use task::ShutdownListener;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
@@ -21,6 +22,7 @@ pub struct SphinxSocksServer {
|
||||
listening_address: SocketAddr,
|
||||
service_provider: Recipient,
|
||||
self_address: Recipient,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
shutdown: ShutdownListener,
|
||||
}
|
||||
|
||||
@@ -31,6 +33,7 @@ impl SphinxSocksServer {
|
||||
authenticator: Authenticator,
|
||||
service_provider: Recipient,
|
||||
self_address: Recipient,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
shutdown: ShutdownListener,
|
||||
) -> Self {
|
||||
// hardcode ip as we (presumably) ONLY want to listen locally. If we change it, we can
|
||||
@@ -42,6 +45,7 @@ impl SphinxSocksServer {
|
||||
listening_address: format!("{}:{}", ip, port).parse().unwrap(),
|
||||
service_provider,
|
||||
self_address,
|
||||
lane_queue_lengths,
|
||||
shutdown,
|
||||
}
|
||||
}
|
||||
@@ -74,10 +78,16 @@ impl SphinxSocksServer {
|
||||
mixnet_response_listener.run().await;
|
||||
});
|
||||
|
||||
let active_connections = Arc::new(std::sync::Mutex::new(0));
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
Ok((stream, _remote)) = listener.accept() => {
|
||||
// TODO Optimize this
|
||||
{
|
||||
let mut g = active_connections.lock().unwrap();
|
||||
*g += 1;
|
||||
}
|
||||
let mut client = SocksClient::new(
|
||||
stream,
|
||||
self.authenticator.clone(),
|
||||
@@ -85,9 +95,12 @@ impl SphinxSocksServer {
|
||||
self.service_provider,
|
||||
controller_sender.clone(),
|
||||
self.self_address,
|
||||
self.lane_queue_lengths.clone(),
|
||||
self.shutdown.clone(),
|
||||
active_connections.clone(),
|
||||
);
|
||||
|
||||
let active_connections = active_connections.clone();
|
||||
tokio::spawn(async move {
|
||||
{
|
||||
match client.run().await {
|
||||
@@ -118,6 +131,10 @@ impl SphinxSocksServer {
|
||||
};
|
||||
// client gets dropped here
|
||||
}
|
||||
{
|
||||
let mut g = active_connections.lock().unwrap();
|
||||
*g -= 1;
|
||||
}
|
||||
});
|
||||
},
|
||||
_ = self.shutdown.recv() => {
|
||||
|
||||
@@ -7,3 +7,4 @@ edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
futures = "0.3"
|
||||
log = "0.4.17"
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use futures::channel::mpsc;
|
||||
|
||||
pub type ConnectionId = u64;
|
||||
@@ -19,3 +21,81 @@ pub enum TransmissionLane {
|
||||
/// they can forward this to the `OutQueueControl` (via `ClientRequest` for the network-requester)
|
||||
pub type ClosedConnectionSender = mpsc::UnboundedSender<ConnectionId>;
|
||||
pub type ClosedConnectionReceiver = mpsc::UnboundedReceiver<ConnectionId>;
|
||||
|
||||
// The `OutQueueControl` publishes the backlog per lane, primarily so that upstream can slow down
|
||||
// if needed.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LaneQueueLengths(std::sync::Arc<std::sync::Mutex<LaneQueueLengthsInner>>);
|
||||
|
||||
impl LaneQueueLengths {
|
||||
pub fn new() -> Self {
|
||||
LaneQueueLengths(std::sync::Arc::new(std::sync::Mutex::new(
|
||||
LaneQueueLengthsInner {
|
||||
map: HashMap::new(),
|
||||
},
|
||||
)))
|
||||
}
|
||||
|
||||
pub fn set(&mut self, lane: &TransmissionLane, lane_length: Option<usize>) {
|
||||
match self.0.lock() {
|
||||
Ok(mut inner) => {
|
||||
if let Some(length) = lane_length {
|
||||
inner
|
||||
.map
|
||||
.entry(*lane)
|
||||
.and_modify(|e| *e = length)
|
||||
.or_insert(length);
|
||||
} else {
|
||||
inner.map.remove(lane);
|
||||
}
|
||||
}
|
||||
Err(err) => log::warn!("Failed to set lane queue length: {err}"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get(&self, lane: &TransmissionLane) -> Option<usize> {
|
||||
match self.0.lock() {
|
||||
Ok(inner) => inner.get(lane),
|
||||
Err(err) => {
|
||||
log::warn!("Failed to get lane queue length: {err}");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for LaneQueueLengths {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for LaneQueueLengths {
|
||||
type Target = std::sync::Arc<std::sync::Mutex<LaneQueueLengthsInner>>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct LaneQueueLengthsInner {
|
||||
pub map: HashMap<TransmissionLane, usize>,
|
||||
}
|
||||
|
||||
impl LaneQueueLengthsInner {
|
||||
pub fn get(&self, lane: &TransmissionLane) -> Option<usize> {
|
||||
self.map.get(lane).copied()
|
||||
}
|
||||
|
||||
pub fn values(&self) -> impl Iterator<Item = &usize> {
|
||||
self.map.values()
|
||||
}
|
||||
|
||||
pub fn modify<F>(&mut self, lane: &TransmissionLane, f: F)
|
||||
where
|
||||
F: FnOnce(&mut usize),
|
||||
{
|
||||
self.map.entry(*lane).and_modify(f);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,11 +5,14 @@ use super::MixProxySender;
|
||||
use super::SHUTDOWN_TIMEOUT;
|
||||
use crate::available_reader::AvailableReader;
|
||||
use bytes::Bytes;
|
||||
use client_connections::LaneQueueLengths;
|
||||
use client_connections::TransmissionLane;
|
||||
use futures::FutureExt;
|
||||
use futures::StreamExt;
|
||||
use log::*;
|
||||
use ordered_buffer::OrderedMessageSender;
|
||||
use socks5_requests::ConnectionId;
|
||||
use std::time::Duration;
|
||||
use std::{io, sync::Arc};
|
||||
use task::ShutdownListener;
|
||||
use tokio::select;
|
||||
@@ -29,7 +32,7 @@ fn send_empty_close<F, S>(
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn deal_with_data<F, S>(
|
||||
async fn deal_with_data<F, S>(
|
||||
read_data: Option<io::Result<Bytes>>,
|
||||
local_destination_address: &str,
|
||||
remote_source_address: &str,
|
||||
@@ -37,6 +40,7 @@ fn deal_with_data<F, S>(
|
||||
message_sender: &mut OrderedMessageSender,
|
||||
mix_sender: &MixProxySender<S>,
|
||||
adapter_fn: F,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
) -> bool
|
||||
where
|
||||
F: Fn(ConnectionId, Vec<u8>, bool) -> S,
|
||||
@@ -67,6 +71,70 @@ where
|
||||
"pushing data down the input sender: size: {}",
|
||||
ordered_msg.len()
|
||||
);
|
||||
|
||||
// wait here until queue is not too long
|
||||
let lane = TransmissionLane::ConnectionId(connection_id);
|
||||
//loop {
|
||||
{
|
||||
let (queue_length, est_busy_conn) = {
|
||||
let mut guard = lane_queue_lengths.lock().unwrap();
|
||||
//let queue_length = *guard.get(&lane).unwrap_or(&0);
|
||||
//let queue_length = *guard.entry(lane).or_insert(0);
|
||||
let queue_length = guard.get(&lane).unwrap_or(0);
|
||||
let est_busy_conn = guard.values().filter(|v| *v > &5).count();
|
||||
|
||||
// We estimate the queue length for subsequent data packet. This is needed
|
||||
// because there is a delay until we get the correct value back as a server
|
||||
// response. (And that will have a delay baked in).
|
||||
// WIP(JON): pull packet size from somewhere
|
||||
let sphinx_size = 2000.0;
|
||||
let msg_length = (ordered_msg.len() as f64 / sphinx_size).ceil() as usize;
|
||||
guard.modify(&lane, |length| *length += msg_length);
|
||||
|
||||
(queue_length, est_busy_conn)
|
||||
};
|
||||
|
||||
log::info!("conn_id: {connection_id}, queue: {queue_length}");
|
||||
// The heuristic here is:
|
||||
// 20ms average delay => 50 packets / sec
|
||||
// 500 packet queue => 10 sec behind
|
||||
// This assumes it's the only active connection, and that there is no throttling
|
||||
// In practive, this is a latency vs throughput tradeoff we're making here
|
||||
let avererage_delay = 0.02; // TODO: read from config
|
||||
let packets_per_sec = 1.0 / avererage_delay;
|
||||
let ideal_time_to_clear_queue = queue_length as f64 / packets_per_sec;
|
||||
if queue_length > 5000 {
|
||||
log::info!("sleeping long");
|
||||
sleep(Duration::from_secs_f64(ideal_time_to_clear_queue * 5.0)).await;
|
||||
} else if queue_length > 500 {
|
||||
log::info!("sleeping medium");
|
||||
sleep(Duration::from_secs_f64(
|
||||
ideal_time_to_clear_queue * 2.0 / 3.0,
|
||||
))
|
||||
.await;
|
||||
} else if queue_length > 5 {
|
||||
log::info!("sleeping short");
|
||||
sleep(Duration::from_secs_f64(ideal_time_to_clear_queue / 3.0)).await;
|
||||
}
|
||||
|
||||
// If we are saturated on number of connections, and this is already a busy
|
||||
// connection, basically soft-stop
|
||||
//if est_busy_conn > 15 && queue_length > 5 {
|
||||
// log::info!("soft-stop: {connection_id}");
|
||||
// sleep(Duration::from_secs_f64(5.0 * 60.0)).await;
|
||||
//}
|
||||
|
||||
//loop {
|
||||
// let count = ACTIVE_PROXIES.load(Ordering::Relaxed);
|
||||
// if count + 1 > 15 {
|
||||
// log::info!("Max connections reached, parking: {conn_id}");
|
||||
// sleep(Duration::from_secs(10)).await;
|
||||
// } else {
|
||||
// break;
|
||||
// }
|
||||
//}
|
||||
}
|
||||
|
||||
mix_sender
|
||||
.unbounded_send(adapter_fn(connection_id, ordered_msg, is_finished))
|
||||
.unwrap();
|
||||
@@ -88,6 +156,7 @@ pub(super) async fn run_inbound<F, S>(
|
||||
mix_sender: MixProxySender<S>,
|
||||
adapter_fn: F,
|
||||
shutdown_notify: Arc<Notify>,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
mut shutdown_listener: ShutdownListener,
|
||||
) -> OwnedReadHalf
|
||||
where
|
||||
@@ -102,12 +171,24 @@ where
|
||||
loop {
|
||||
select! {
|
||||
read_data = &mut available_reader.next() => {
|
||||
if deal_with_data(read_data, &local_destination_address, &remote_source_address, connection_id, &mut message_sender, &mix_sender, &adapter_fn) {
|
||||
if deal_with_data(
|
||||
read_data,
|
||||
&local_destination_address,
|
||||
&remote_source_address,
|
||||
connection_id,
|
||||
&mut message_sender,
|
||||
&mix_sender,
|
||||
&adapter_fn,
|
||||
lane_queue_lengths.clone()
|
||||
).await {
|
||||
break
|
||||
}
|
||||
}
|
||||
_ = &mut shutdown_future => {
|
||||
debug!("closing inbound proxy after outbound was closed {:?} ago", SHUTDOWN_TIMEOUT);
|
||||
debug!(
|
||||
"closing inbound proxy after outbound was closed {:?} ago",
|
||||
SHUTDOWN_TIMEOUT
|
||||
);
|
||||
// inform remote just in case it was closed because of lack of heartbeat.
|
||||
// worst case the remote will just have couple of false negatives
|
||||
send_empty_close(connection_id, &mut message_sender, &mix_sender, &adapter_fn);
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::connection_controller::ConnectionReceiver;
|
||||
use client_connections::LaneQueueLengths;
|
||||
use futures::channel::mpsc;
|
||||
use socks5_requests::ConnectionId;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
@@ -12,7 +13,7 @@ mod inbound;
|
||||
mod outbound;
|
||||
|
||||
// TODO: make this configurable
|
||||
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(300);
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ProxyMessage {
|
||||
@@ -45,6 +46,7 @@ pub struct ProxyRunner<S> {
|
||||
local_destination_address: String,
|
||||
remote_source_address: String,
|
||||
connection_id: ConnectionId,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
|
||||
// Listens to shutdown commands from higher up
|
||||
shutdown_listener: ShutdownListener,
|
||||
@@ -61,6 +63,7 @@ where
|
||||
mix_receiver: ConnectionReceiver,
|
||||
mix_sender: MixProxySender<S>,
|
||||
connection_id: ConnectionId,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
shutdown_listener: ShutdownListener,
|
||||
) -> Self {
|
||||
ProxyRunner {
|
||||
@@ -70,6 +73,7 @@ where
|
||||
local_destination_address,
|
||||
remote_source_address,
|
||||
connection_id,
|
||||
lane_queue_lengths,
|
||||
shutdown_listener,
|
||||
}
|
||||
}
|
||||
@@ -78,7 +82,7 @@ where
|
||||
// request/response as required by entity running particular side of the proxy.
|
||||
pub async fn run<F>(mut self, adapter_fn: F) -> Self
|
||||
where
|
||||
F: Fn(ConnectionId, Vec<u8>, bool) -> S + Send + 'static,
|
||||
F: Fn(ConnectionId, Vec<u8>, bool) -> S + Send + Sync + 'static,
|
||||
{
|
||||
let (read_half, write_half) = self.socket.take().unwrap().into_split();
|
||||
let shutdown_notify = Arc::new(Notify::new());
|
||||
@@ -92,6 +96,7 @@ where
|
||||
self.mix_sender.clone(),
|
||||
adapter_fn,
|
||||
Arc::clone(&shutdown_notify),
|
||||
self.lane_queue_lengths.clone(),
|
||||
self.shutdown_listener.clone(),
|
||||
);
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@ use tokio::select;
|
||||
use tokio::{net::tcp::OwnedWriteHalf, sync::Notify, time::sleep, time::Instant};
|
||||
|
||||
const MIX_TTL: Duration = Duration::from_secs(5 * 60);
|
||||
//const MIX_TTL: Duration = Duration::from_secs(15 * 60);
|
||||
|
||||
async fn deal_with_message(
|
||||
connection_message: ConnectionMessage,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
// Copyright 2020 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use client_connections::LaneQueueLengths;
|
||||
use futures::channel::mpsc;
|
||||
use nymsphinx::addressing::clients::Recipient;
|
||||
use proxy_helpers::connection_controller::ConnectionReceiver;
|
||||
@@ -41,6 +42,7 @@ impl Connection {
|
||||
&mut self,
|
||||
mix_receiver: ConnectionReceiver,
|
||||
mix_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
let stream = self.conn.take().unwrap();
|
||||
@@ -54,6 +56,7 @@ impl Connection {
|
||||
mix_receiver,
|
||||
mix_sender,
|
||||
connection_id,
|
||||
lane_queue_lengths,
|
||||
shutdown,
|
||||
)
|
||||
.run(move |conn_id, read_data, socket_closed| {
|
||||
|
||||
@@ -7,7 +7,7 @@ use crate::error::NetworkRequesterError;
|
||||
use crate::statistics::ServiceStatisticsCollector;
|
||||
use crate::websocket;
|
||||
use crate::websocket::TSWebsocketStream;
|
||||
use client_connections::ClosedConnectionReceiver;
|
||||
use client_connections::{ClosedConnectionReceiver, LaneQueueLengths, TransmissionLane};
|
||||
use futures::channel::mpsc;
|
||||
use futures::stream::{SplitSink, SplitStream};
|
||||
use futures::{SinkExt, StreamExt};
|
||||
@@ -118,6 +118,7 @@ impl ServiceProvider {
|
||||
|
||||
async fn read_websocket_message(
|
||||
websocket_reader: &mut SplitStream<TSWebsocketStream>,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
) -> Option<ReconstructedMessage> {
|
||||
while let Some(msg) = websocket_reader.next().await {
|
||||
let data = msg
|
||||
@@ -138,6 +139,15 @@ impl ServiceProvider {
|
||||
|
||||
let received = match deserialized_message {
|
||||
ServerResponse::Received(received) => received,
|
||||
ServerResponse::LaneQueueLength(lane, queue_length) => {
|
||||
log::info!(
|
||||
"received LaneQueueLength lane: {lane}, queue_length: {queue_length}"
|
||||
);
|
||||
let lane = TransmissionLane::ConnectionId(lane);
|
||||
let mut guard = lane_queue_lengths.lock().unwrap();
|
||||
guard.map.insert(lane, queue_length);
|
||||
continue;
|
||||
}
|
||||
ServerResponse::Error(err) => {
|
||||
panic!("received error from native client! - {}", err)
|
||||
}
|
||||
@@ -154,6 +164,7 @@ impl ServiceProvider {
|
||||
return_address: Recipient,
|
||||
controller_sender: ControllerSender,
|
||||
mix_input_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
let mut conn = match Connection::new(conn_id, remote_addr.clone(), return_address).await {
|
||||
@@ -191,7 +202,7 @@ impl ServiceProvider {
|
||||
);
|
||||
|
||||
// run the proxy on the connection
|
||||
conn.run_proxy(mix_receiver, mix_input_sender, shutdown)
|
||||
conn.run_proxy(mix_receiver, mix_input_sender, lane_queue_lengths, shutdown)
|
||||
.await;
|
||||
|
||||
// proxy is done - remove the access channel from the controller
|
||||
@@ -207,6 +218,7 @@ impl ServiceProvider {
|
||||
);
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn handle_proxy_connect(
|
||||
&mut self,
|
||||
controller_sender: &mut ControllerSender,
|
||||
@@ -214,6 +226,7 @@ impl ServiceProvider {
|
||||
conn_id: ConnectionId,
|
||||
remote_addr: String,
|
||||
return_address: Recipient,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
if !self.open_proxy && !self.outbound_request_filter.check(&remote_addr) {
|
||||
@@ -241,6 +254,7 @@ impl ServiceProvider {
|
||||
return_address,
|
||||
controller_sender_clone,
|
||||
mix_input_sender_clone,
|
||||
lane_queue_lengths,
|
||||
shutdown,
|
||||
)
|
||||
.await
|
||||
@@ -265,6 +279,7 @@ impl ServiceProvider {
|
||||
controller_sender: &mut ControllerSender,
|
||||
mix_input_sender: &mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
||||
stats_collector: Option<ServiceStatisticsCollector>,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
let deserialized_msg = match Socks5Message::try_from_bytes(raw_request) {
|
||||
@@ -290,6 +305,7 @@ impl ServiceProvider {
|
||||
req.conn_id,
|
||||
req.remote_addr,
|
||||
req.return_address,
|
||||
lane_queue_lengths,
|
||||
shutdown,
|
||||
)
|
||||
}
|
||||
@@ -336,6 +352,10 @@ impl ServiceProvider {
|
||||
// `ClientRequest`.
|
||||
let (closed_connection_tx, closed_connection_rx) = mpsc::unbounded();
|
||||
|
||||
// Shared queue length data. Published by the `OutQueueController` in the client, and used
|
||||
// primarily to throttle incoming connections
|
||||
let shared_lane_queue_lengths = LaneQueueLengths::new();
|
||||
|
||||
// Controller for managing all active connections.
|
||||
// We provide it with a ShutdownListener since it requires it, even though for the network
|
||||
// requester shutdown signalling is not yet fully implemented.
|
||||
@@ -376,7 +396,12 @@ impl ServiceProvider {
|
||||
println!("\nAll systems go. Press CTRL-C to stop the server.");
|
||||
// for each incoming message from the websocket... (which in 99.99% cases is going to be a mix message)
|
||||
loop {
|
||||
let received = match Self::read_websocket_message(&mut websocket_reader).await {
|
||||
let received = match Self::read_websocket_message(
|
||||
&mut websocket_reader,
|
||||
shared_lane_queue_lengths.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Some(msg) => msg,
|
||||
None => {
|
||||
error!("The websocket stream has finished!");
|
||||
@@ -392,6 +417,7 @@ impl ServiceProvider {
|
||||
&mut controller_sender,
|
||||
&mix_input_sender,
|
||||
stats_collector.clone(),
|
||||
shared_lane_queue_lengths.clone(),
|
||||
shutdown.subscribe(),
|
||||
)
|
||||
.await;
|
||||
|
||||
Reference in New Issue
Block a user