Compare commits

...

9 Commits

Author SHA1 Message Date
Jon Häggblad c61eb014c5 Add timeouts 2022-11-22 23:25:06 +01:00
Jon Häggblad c10f788a89 getting lane queue lengths in place 2022-11-22 23:07:00 +01:00
Jon Häggblad 73fa863a6a WIP 2022-11-22 21:55:33 +01:00
Jon Häggblad a561f1b854 Pass lane queue lengths to inbound future 2022-11-22 13:58:53 +01:00
Jon Häggblad 1c379fa0db WIP 2022-11-22 11:49:35 +01:00
Jon Häggblad 31778c6e51 WIP 2022-11-22 11:13:38 +01:00
Jon Häggblad 9d0562168e WIP 2022-11-22 11:00:52 +01:00
Jon Häggblad 701f603825 FIX 2022-11-22 10:37:48 +01:00
Jon Häggblad 97b250433e WIP 2022-11-22 10:30:16 +01:00
10 changed files with 214 additions and 41 deletions
@@ -5,7 +5,7 @@ use crate::client::mix_traffic::BatchMixMessageSender;
use crate::client::real_messages_control::acknowledgement_control::SentPacketNotificationSender;
use crate::client::topology_control::TopologyAccessor;
use client_connections::{
ClosedConnectionReceiver, ConnectionId, LaneQueueLengths, TransmissionLane,
ClosedConnectionReceiver, ConnectionCommand, ConnectionId, LaneQueueLengths, TransmissionLane,
};
use futures::task::{Context, Poll};
use futures::{Future, Stream, StreamExt};
@@ -336,7 +336,10 @@ where
// NOTE: this feels a bit iffy, the `OutQueueControl` is getting ripe for a rewrite to
// something simpler.
if let Poll::Ready(Some(id)) = Pin::new(&mut self.closed_connection_rx).poll_next(cx) {
self.on_close_connection(id);
match id {
ConnectionCommand::Close(id) => self.on_close_connection(id),
ConnectionCommand::ActiveConnections(_) => panic!(),
}
}
if let Some(ref mut next_delay) = &mut self.next_delay {
@@ -412,7 +415,10 @@ where
fn poll_immediate(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
// Start by checking if we have any incoming messages about closed connections
if let Poll::Ready(Some(id)) = Pin::new(&mut self.closed_connection_rx).poll_next(cx) {
self.on_close_connection(id);
match id {
ConnectionCommand::Close(id) => self.on_close_connection(id),
ConnectionCommand::ActiveConnections(_) => panic!(),
}
}
match Pin::new(&mut self.real_receiver).poll_recv(cx) {
+5 -2
View File
@@ -288,6 +288,7 @@ impl NymClient {
&self,
buffer_requester: ReceivedBufferRequestSender,
msg_input: InputMessageSender,
shared_lane_queue_lengths: LaneQueueLengths,
closed_connection_tx: ClosedConnectionSender,
) {
info!("Starting websocket listener...");
@@ -297,6 +298,7 @@ impl NymClient {
closed_connection_tx,
buffer_requester,
&self.as_mix_recipient(),
shared_lane_queue_lengths,
);
websocket::Listener::new(self.config.get_listening_port()).start(websocket_handler);
@@ -442,7 +444,7 @@ impl NymClient {
// Shared queue length data. Published by the `OutQueueController` in the client, and used
// primarily to throttle incoming connections (e.g socks5 for attached network-requesters)
let shared_lane_queue_length = LaneQueueLengths::new();
let shared_lane_queue_lengths = LaneQueueLengths::new();
self.start_real_traffic_controller(
shared_topology_accessor.clone(),
@@ -450,7 +452,7 @@ impl NymClient {
ack_receiver,
input_receiver,
sphinx_message_sender.clone(),
shared_lane_queue_length,
shared_lane_queue_lengths.clone(),
closed_connection_rx,
shutdown.subscribe(),
);
@@ -471,6 +473,7 @@ impl NymClient {
SocketType::WebSocket => self.start_websocket_listener(
received_buffer_request_sender,
input_sender,
shared_lane_queue_lengths,
closed_connection_tx,
),
SocketType::None => {
+31 -2
View File
@@ -1,7 +1,9 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use client_connections::{ClosedConnectionSender, TransmissionLane};
use client_connections::{
ClosedConnectionSender, ConnectionCommand, LaneQueueLengths, TransmissionLane,
};
use client_core::client::{
inbound_messages::{InputMessage, InputMessageSender},
received_buffer::{
@@ -40,6 +42,7 @@ pub(crate) struct Handler {
self_full_address: Recipient,
socket: Option<WebSocketStream<TcpStream>>,
received_response_type: ReceivedResponseType,
lane_queue_lengths: LaneQueueLengths,
}
// clone is used to use handler on a new connection, which initially is `None`
@@ -52,6 +55,7 @@ impl Clone for Handler {
self_full_address: self.self_full_address,
socket: None,
received_response_type: Default::default(),
lane_queue_lengths: self.lane_queue_lengths.clone(),
}
}
}
@@ -70,6 +74,7 @@ impl Handler {
closed_connection_tx: ClosedConnectionSender,
buffer_requester: ReceivedBufferRequestSender,
self_full_address: &Recipient,
lane_queue_lengths: LaneQueueLengths,
) -> Self {
Handler {
msg_input,
@@ -78,6 +83,7 @@ impl Handler {
self_full_address: *self_full_address,
socket: None,
received_response_type: Default::default(),
lane_queue_lengths,
}
}
@@ -95,6 +101,15 @@ impl Handler {
panic!();
}
// on receiving a send, we reply back the current lane queue length for that connection id.
// Note that this does _NOT_ take into account the packets that have been received but not
// yet reach `OutQueueControl`, so it might be a tad low.
if let Ok(lane_queue_lengths) = self.lane_queue_lengths.lock() {
let queue_length = lane_queue_lengths.get(&lane).unwrap_or(0);
return Some(ServerResponse::LaneQueueLength(connection_id, queue_length));
}
log::warn!("Failed to get the lane queue length lock, not responding back with the current queue length");
None
}
@@ -121,11 +136,24 @@ impl Handler {
fn handle_closed_connection(&self, connection_id: u64) -> Option<ServerResponse> {
self.closed_connection_tx
.unbounded_send(connection_id)
.unbounded_send(ConnectionCommand::Close(connection_id))
.unwrap();
None
}
fn handle_get_lane_queue_length(&self, connection_id: u64) -> Option<ServerResponse> {
let Ok(lane_queue_lengths) = self.lane_queue_lengths.lock() else {
log::warn!(
"Failed to get the lane queue length lock, not responding back with the current queue length"
);
return None;
};
let lane = TransmissionLane::ConnectionId(connection_id);
let queue_length = lane_queue_lengths.get(&lane).unwrap_or(0);
Some(ServerResponse::LaneQueueLength(connection_id, queue_length))
}
async fn handle_request(&mut self, request: ClientRequest) -> Option<ServerResponse> {
match request {
ClientRequest::Send {
@@ -143,6 +171,7 @@ impl Handler {
} => self.handle_reply(reply_surb, message).await,
ClientRequest::SelfAddress => Some(self.handle_self_address()),
ClientRequest::ClosedConnection(id) => self.handle_closed_connection(id),
ClientRequest::GetLaneQueueLength(id) => self.handle_get_lane_queue_length(id),
}
}
@@ -23,6 +23,9 @@ pub const SELF_ADDRESS_REQUEST_TAG: u8 = 0x02;
/// Value tag representing [`ClosedConnection`] variant of the [`ClientRequest`]
pub const CLOSED_CONNECTION_REQUEST_TAG: u8 = 0x03;
/// Value tag representing [`GetLaneQueueLength`] variant of the [`ClientRequest`]
pub const GET_LANE_QUEUE_LENGHT_TAG: u8 = 0x04;
#[allow(non_snake_case)]
#[derive(Debug)]
pub enum ClientRequest {
@@ -39,6 +42,7 @@ pub enum ClientRequest {
},
SelfAddress,
ClosedConnection(u64),
GetLaneQueueLength(u64),
}
// we could have been parsing it directly TryFrom<WsMessage>, but we want to retain
@@ -241,6 +245,26 @@ impl ClientRequest {
ClientRequest::ClosedConnection(connection_id)
}
// GET_LANE_QUEUE_LENGHT_TAG
fn serialize_get_lane_queue_lengths(connection_id: u64) -> Vec<u8> {
let conn_id_bytes = connection_id.to_be_bytes();
std::iter::once(GET_LANE_QUEUE_LENGHT_TAG)
.chain(conn_id_bytes.iter().copied())
.collect()
}
// GET_LANE_QUEUE_LENGHT_TAG
fn deserialize_get_lane_queue_length(b: &[u8]) -> Self {
// this MUST match because it was called by 'deserialize'
debug_assert_eq!(b[0], GET_LANE_QUEUE_LENGHT_TAG);
let mut connection_id_bytes = [0u8; size_of::<u64>()];
connection_id_bytes.copy_from_slice(&b[1..=size_of::<u64>()]);
let connection_id = u64::from_be_bytes(connection_id_bytes);
ClientRequest::GetLaneQueueLength(connection_id)
}
pub fn serialize(self) -> Vec<u8> {
match self {
ClientRequest::Send {
@@ -258,6 +282,8 @@ impl ClientRequest {
ClientRequest::SelfAddress => Self::serialize_self_address(),
ClientRequest::ClosedConnection(id) => Self::serialize_closed_connection(id),
ClientRequest::GetLaneQueueLength(id) => Self::serialize_get_lane_queue_lengths(id),
}
}
@@ -288,6 +314,7 @@ impl ClientRequest {
REPLY_REQUEST_TAG => Self::deserialize_reply(b),
SELF_ADDRESS_REQUEST_TAG => Ok(Self::deserialize_self_address(b)),
CLOSED_CONNECTION_REQUEST_TAG => Ok(Self::deserialize_closed_connection(b)),
GET_LANE_QUEUE_LENGHT_TAG => Ok(Self::deserialize_get_lane_queue_length(b)),
n => Err(error::Error::new(
ErrorKind::UnknownRequest,
format!("type {n}"),
+1 -1
View File
@@ -62,7 +62,7 @@ impl SphinxSocksServer {
// controller for managing all active connections
let (mut active_streams_controller, controller_sender) =
Controller::new(closed_connection_tx, self.shutdown.clone());
Controller::new(closed_connection_tx, false, self.shutdown.clone());
tokio::spawn(async move {
active_streams_controller.run().await;
});
+14 -2
View File
@@ -19,8 +19,20 @@ pub enum TransmissionLane {
/// Announce connections that are closed, for whoever is interested.
/// One usecase is that the network-requester and socks5-client wants to know about this, so that
/// they can forward this to the `OutQueueControl` (via `ClientRequest` for the network-requester)
pub type ClosedConnectionSender = mpsc::UnboundedSender<ConnectionId>;
pub type ClosedConnectionReceiver = mpsc::UnboundedReceiver<ConnectionId>;
pub type ClosedConnectionSender = mpsc::UnboundedSender<ConnectionCommand>;
pub type ClosedConnectionReceiver = mpsc::UnboundedReceiver<ConnectionCommand>;
pub enum ConnectionCommand {
// Announce that at a connection was closed. E.g the `OutQueueControl` uses this to discard
// transmission lanes.
Close(ConnectionId),
// In the network requester for example, we usually want to broadcast active connections
// regularly, so we know what connections we need to request lane queue lengths for from the
// client.
// In the socks5-client, this is not needed since have direct access to the lane queue lengths.
ActiveConnections(Vec<ConnectionId>),
}
// The `OutQueueControl` publishes the backlog per lane, primarily so that upstream can slow down
// if needed.
@@ -1,14 +1,18 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use client_connections::ClosedConnectionSender;
use client_connections::{ClosedConnectionSender, ConnectionCommand};
use futures::channel::mpsc;
use futures::StreamExt;
use log::*;
use ordered_buffer::{OrderedMessage, OrderedMessageBuffer, ReadContiguousData};
use socks5_requests::ConnectionId;
use std::collections::{HashMap, HashSet};
use std::{
collections::{HashMap, HashSet},
time::Duration,
};
use task::ShutdownListener;
use tokio::time;
/// A generic message produced after reading from a socket/connection. It includes data that was
/// actually read alongside boolean indicating whether the connection got closed so that
@@ -77,6 +81,8 @@ pub struct Controller {
// Broadcast closed connections
closed_connection_tx: ClosedConnectionSender,
broadcast_connections: bool,
// TODO: this can potentially be abused to ddos and kill provider. Not sure at this point
// how to handle it more gracefully
@@ -90,6 +96,7 @@ pub struct Controller {
impl Controller {
pub fn new(
closed_connection_tx: ClosedConnectionSender,
broadcast_connections: bool,
shutdown: ShutdownListener,
) -> (Self, ControllerSender) {
let (sender, receiver) = mpsc::unbounded();
@@ -99,6 +106,7 @@ impl Controller {
receiver,
recently_closed: HashSet::new(),
closed_connection_tx,
broadcast_connections,
pending_messages: HashMap::new(),
shutdown,
},
@@ -137,7 +145,18 @@ impl Controller {
self.recently_closed.insert(conn_id);
// Announce closed connections, currently used by the `OutQueueControl`.
self.closed_connection_tx.unbounded_send(conn_id).unwrap();
self.closed_connection_tx
.unbounded_send(ConnectionCommand::Close(conn_id))
.unwrap();
}
fn broadcast_active_connections(&mut self) {
// What about the recently closed ones? Hopefully we can ignore them ...
let conn_ids = self.active_connections.keys().copied().collect();
self.closed_connection_tx
.unbounded_send(ConnectionCommand::ActiveConnections(conn_ids))
.unwrap();
}
fn send_to_connection(&mut self, conn_id: ConnectionId, payload: Vec<u8>, is_closed: bool) {
@@ -196,6 +215,8 @@ impl Controller {
}
pub async fn run(&mut self) {
let mut interval = time::interval(Duration::from_millis(500));
loop {
tokio::select! {
command = self.receiver.next() => match command {
@@ -210,7 +231,12 @@ impl Controller {
log::trace!("SOCKS5 Controller: Stopping since channel closed");
break;
}
}
},
_ = interval.tick() => {
if self.broadcast_connections {
self.broadcast_active_connections();
}
},
}
}
assert!(self.shutdown.is_shutdown_poll());
@@ -106,23 +106,37 @@ where
}
async fn wait_until_lane_empty(lane_queue_lengths: LaneQueueLengths, connection_id: u64) {
wait_for_lane(
lane_queue_lengths,
connection_id,
0,
Duration::from_millis(500),
if tokio::time::timeout(
Duration::from_secs(15),
wait_for_lane(
lane_queue_lengths,
connection_id,
0,
Duration::from_millis(500),
),
)
.await
.is_err()
{
log::warn!("Wait until lane empty timed out");
}
}
async fn wait_until_lane_almost_empty(lane_queue_lengths: LaneQueueLengths, connection_id: u64) {
wait_for_lane(
lane_queue_lengths,
connection_id,
10,
Duration::from_millis(100),
if tokio::time::timeout(
Duration::from_secs(15),
wait_for_lane(
lane_queue_lengths,
connection_id,
10,
Duration::from_millis(100),
),
)
.await
.is_err()
{
log::warn!("Wait until lane almost empty timed out");
}
}
async fn wait_for_lane(
@@ -1,6 +1,7 @@
// Copyright 2020 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use client_connections::LaneQueueLengths;
use nymsphinx::addressing::clients::Recipient;
use proxy_helpers::connection_controller::ConnectionReceiver;
use proxy_helpers::proxy_runner::{MixProxySender, ProxyRunner};
@@ -40,6 +41,7 @@ impl Connection {
&mut self,
mix_receiver: ConnectionReceiver,
mix_sender: MixProxySender<(Socks5Message, Recipient)>,
lane_queue_lengths: LaneQueueLengths,
shutdown: ShutdownListener,
) {
let stream = self.conn.take().unwrap();
@@ -53,7 +55,7 @@ impl Connection {
mix_receiver,
mix_sender,
connection_id,
None,
Some(lane_queue_lengths),
shutdown,
)
.run(move |conn_id, read_data, socket_closed| {
+70 -16
View File
@@ -7,7 +7,9 @@ use crate::error::NetworkRequesterError;
use crate::statistics::ServiceStatisticsCollector;
use crate::websocket;
use crate::websocket::TSWebsocketStream;
use client_connections::ClosedConnectionReceiver;
use client_connections::{
ClosedConnectionReceiver, ConnectionCommand, LaneQueueLengths, TransmissionLane,
};
use futures::channel::mpsc;
use futures::stream::{SplitSink, SplitStream};
use futures::{SinkExt, StreamExt};
@@ -108,17 +110,49 @@ impl ServiceProvider {
break;
}
},
Some(id) = closed_connection_rx.next() => {
let msg = ClientRequest::ClosedConnection(id);
let ws_msg = Message::Binary(msg.serialize());
websocket_writer.send(ws_msg).await.unwrap();
}
Some(command) = closed_connection_rx.next() => {
match command {
ConnectionCommand::Close(id) => {
let msg = ClientRequest::ClosedConnection(id);
let ws_msg = Message::Binary(msg.serialize());
websocket_writer.send(ws_msg).await.unwrap();
}
ConnectionCommand::ActiveConnections(ids) => {
// We can optimize this by sending a single request, but this is
// usually in the low single digits, max a few tens, so we leave that
// for a rainy day.
// Also that means fiddling with the currently manual
// serialize/deserialize we do with ClientRequests ... bleh
for id in ids {
log::info!("Requesting lane queue length for: {}", id);
let msg = ClientRequest::GetLaneQueueLength(id);
let ws_msg = Message::Binary(msg.serialize());
websocket_writer.send(ws_msg).await.unwrap();
}
}
}
},
}
}
}
fn handle_lane_queue_length_response(
lane_queue_lengths: &LaneQueueLengths,
lane: u64,
queue_length: usize,
) {
log::info!("received LaneQueueLength lane: {lane}, queue_length: {queue_length}");
if let Ok(mut lane_queue_lengths) = lane_queue_lengths.lock() {
let lane = TransmissionLane::ConnectionId(lane);
lane_queue_lengths.map.insert(lane, queue_length);
} else {
log::warn!("Unable to lock lane queue lengths, skipping updating received lane length")
}
}
async fn read_websocket_message(
websocket_reader: &mut SplitStream<TSWebsocketStream>,
lane_queue_lengths: LaneQueueLengths,
) -> Option<ReconstructedMessage> {
while let Some(msg) = websocket_reader.next().await {
let data = msg
@@ -139,6 +173,14 @@ impl ServiceProvider {
let received = match deserialized_message {
ServerResponse::Received(received) => received,
ServerResponse::LaneQueueLength(lane, queue_length) => {
Self::handle_lane_queue_length_response(
&lane_queue_lengths,
lane,
queue_length,
);
continue;
}
ServerResponse::Error(err) => {
panic!("received error from native client! - {}", err)
}
@@ -155,6 +197,7 @@ impl ServiceProvider {
return_address: Recipient,
controller_sender: ControllerSender,
mix_input_sender: MixProxySender<(Socks5Message, Recipient)>,
lane_queue_lengths: LaneQueueLengths,
shutdown: ShutdownListener,
) {
let mut conn = match Connection::new(conn_id, remote_addr.clone(), return_address).await {
@@ -196,7 +239,7 @@ impl ServiceProvider {
);
// run the proxy on the connection
conn.run_proxy(mix_receiver, mix_input_sender, shutdown)
conn.run_proxy(mix_receiver, mix_input_sender, lane_queue_lengths, shutdown)
.await;
// proxy is done - remove the access channel from the controller
@@ -212,10 +255,12 @@ impl ServiceProvider {
);
}
#[allow(clippy::too_many_arguments)]
async fn handle_proxy_connect(
&mut self,
controller_sender: &mut ControllerSender,
mix_input_sender: &MixProxySender<(Socks5Message, Recipient)>,
lane_queue_lengths: LaneQueueLengths,
conn_id: ConnectionId,
remote_addr: String,
return_address: Recipient,
@@ -250,6 +295,7 @@ impl ServiceProvider {
return_address,
controller_sender_clone,
mix_input_sender_clone,
lane_queue_lengths,
shutdown,
)
.await
@@ -257,7 +303,6 @@ impl ServiceProvider {
}
fn handle_proxy_send(
&self,
controller_sender: &mut ControllerSender,
conn_id: ConnectionId,
data: Vec<u8>,
@@ -273,6 +318,7 @@ impl ServiceProvider {
raw_request: &[u8],
controller_sender: &mut ControllerSender,
mix_input_sender: &MixProxySender<(Socks5Message, Recipient)>,
lane_queue_lengths: LaneQueueLengths,
stats_collector: Option<ServiceStatisticsCollector>,
shutdown: ShutdownListener,
) {
@@ -296,6 +342,7 @@ impl ServiceProvider {
self.handle_proxy_connect(
controller_sender,
mix_input_sender,
lane_queue_lengths,
req.conn_id,
req.remote_addr,
req.return_address,
@@ -319,7 +366,7 @@ impl ServiceProvider {
.processed(remote_addr, data.len() as u32);
}
}
self.handle_proxy_send(controller_sender, conn_id, data, closed)
Self::handle_proxy_send(controller_sender, conn_id, data, closed)
}
},
Socks5Message::Response(_) | Socks5Message::NetworkRequesterResponse(_) => {}
@@ -346,11 +393,15 @@ impl ServiceProvider {
// `ClientRequest`.
let (closed_connection_tx, closed_connection_rx) = mpsc::unbounded();
// Shared queue length data. Published by the `OutQueueController` in the client, and used
// primarily to throttle incoming connections
let shared_lane_queue_lengths = LaneQueueLengths::new();
// Controller for managing all active connections.
// We provide it with a ShutdownListener since it requires it, even though for the network
// requester shutdown signalling is not yet fully implemented.
let (mut active_connections_controller, mut controller_sender) =
Controller::new(closed_connection_tx, shutdown.subscribe());
Controller::new(closed_connection_tx, true, shutdown.subscribe());
tokio::spawn(async move {
active_connections_controller.run().await;
@@ -386,12 +437,14 @@ impl ServiceProvider {
println!("\nAll systems go. Press CTRL-C to stop the server.");
// for each incoming message from the websocket... (which in 99.99% cases is going to be a mix message)
loop {
let received = match Self::read_websocket_message(&mut websocket_reader).await {
Some(msg) => msg,
None => {
error!("The websocket stream has finished!");
return Ok(());
}
let Some(received) = Self::read_websocket_message(
&mut websocket_reader,
shared_lane_queue_lengths.clone()
)
.await
else {
log::error!("The websocket stream has finished!");
return Ok(());
};
let raw_message = received.message;
@@ -401,6 +454,7 @@ impl ServiceProvider {
&raw_message,
&mut controller_sender,
&mix_input_sender,
shared_lane_queue_lengths.clone(),
stats_collector.clone(),
shutdown.subscribe(),
)