Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c61eb014c5 | |||
| c10f788a89 | |||
| 73fa863a6a | |||
| a561f1b854 | |||
| 1c379fa0db | |||
| 31778c6e51 | |||
| 9d0562168e | |||
| 701f603825 | |||
| 97b250433e |
@@ -5,7 +5,7 @@ use crate::client::mix_traffic::BatchMixMessageSender;
|
|||||||
use crate::client::real_messages_control::acknowledgement_control::SentPacketNotificationSender;
|
use crate::client::real_messages_control::acknowledgement_control::SentPacketNotificationSender;
|
||||||
use crate::client::topology_control::TopologyAccessor;
|
use crate::client::topology_control::TopologyAccessor;
|
||||||
use client_connections::{
|
use client_connections::{
|
||||||
ClosedConnectionReceiver, ConnectionId, LaneQueueLengths, TransmissionLane,
|
ClosedConnectionReceiver, ConnectionCommand, ConnectionId, LaneQueueLengths, TransmissionLane,
|
||||||
};
|
};
|
||||||
use futures::task::{Context, Poll};
|
use futures::task::{Context, Poll};
|
||||||
use futures::{Future, Stream, StreamExt};
|
use futures::{Future, Stream, StreamExt};
|
||||||
@@ -336,7 +336,10 @@ where
|
|||||||
// NOTE: this feels a bit iffy, the `OutQueueControl` is getting ripe for a rewrite to
|
// NOTE: this feels a bit iffy, the `OutQueueControl` is getting ripe for a rewrite to
|
||||||
// something simpler.
|
// something simpler.
|
||||||
if let Poll::Ready(Some(id)) = Pin::new(&mut self.closed_connection_rx).poll_next(cx) {
|
if let Poll::Ready(Some(id)) = Pin::new(&mut self.closed_connection_rx).poll_next(cx) {
|
||||||
self.on_close_connection(id);
|
match id {
|
||||||
|
ConnectionCommand::Close(id) => self.on_close_connection(id),
|
||||||
|
ConnectionCommand::ActiveConnections(_) => panic!(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(ref mut next_delay) = &mut self.next_delay {
|
if let Some(ref mut next_delay) = &mut self.next_delay {
|
||||||
@@ -412,7 +415,10 @@ where
|
|||||||
fn poll_immediate(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
|
fn poll_immediate(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
|
||||||
// Start by checking if we have any incoming messages about closed connections
|
// Start by checking if we have any incoming messages about closed connections
|
||||||
if let Poll::Ready(Some(id)) = Pin::new(&mut self.closed_connection_rx).poll_next(cx) {
|
if let Poll::Ready(Some(id)) = Pin::new(&mut self.closed_connection_rx).poll_next(cx) {
|
||||||
self.on_close_connection(id);
|
match id {
|
||||||
|
ConnectionCommand::Close(id) => self.on_close_connection(id),
|
||||||
|
ConnectionCommand::ActiveConnections(_) => panic!(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
match Pin::new(&mut self.real_receiver).poll_recv(cx) {
|
match Pin::new(&mut self.real_receiver).poll_recv(cx) {
|
||||||
|
|||||||
@@ -288,6 +288,7 @@ impl NymClient {
|
|||||||
&self,
|
&self,
|
||||||
buffer_requester: ReceivedBufferRequestSender,
|
buffer_requester: ReceivedBufferRequestSender,
|
||||||
msg_input: InputMessageSender,
|
msg_input: InputMessageSender,
|
||||||
|
shared_lane_queue_lengths: LaneQueueLengths,
|
||||||
closed_connection_tx: ClosedConnectionSender,
|
closed_connection_tx: ClosedConnectionSender,
|
||||||
) {
|
) {
|
||||||
info!("Starting websocket listener...");
|
info!("Starting websocket listener...");
|
||||||
@@ -297,6 +298,7 @@ impl NymClient {
|
|||||||
closed_connection_tx,
|
closed_connection_tx,
|
||||||
buffer_requester,
|
buffer_requester,
|
||||||
&self.as_mix_recipient(),
|
&self.as_mix_recipient(),
|
||||||
|
shared_lane_queue_lengths,
|
||||||
);
|
);
|
||||||
|
|
||||||
websocket::Listener::new(self.config.get_listening_port()).start(websocket_handler);
|
websocket::Listener::new(self.config.get_listening_port()).start(websocket_handler);
|
||||||
@@ -442,7 +444,7 @@ impl NymClient {
|
|||||||
|
|
||||||
// Shared queue length data. Published by the `OutQueueController` in the client, and used
|
// Shared queue length data. Published by the `OutQueueController` in the client, and used
|
||||||
// primarily to throttle incoming connections (e.g socks5 for attached network-requesters)
|
// primarily to throttle incoming connections (e.g socks5 for attached network-requesters)
|
||||||
let shared_lane_queue_length = LaneQueueLengths::new();
|
let shared_lane_queue_lengths = LaneQueueLengths::new();
|
||||||
|
|
||||||
self.start_real_traffic_controller(
|
self.start_real_traffic_controller(
|
||||||
shared_topology_accessor.clone(),
|
shared_topology_accessor.clone(),
|
||||||
@@ -450,7 +452,7 @@ impl NymClient {
|
|||||||
ack_receiver,
|
ack_receiver,
|
||||||
input_receiver,
|
input_receiver,
|
||||||
sphinx_message_sender.clone(),
|
sphinx_message_sender.clone(),
|
||||||
shared_lane_queue_length,
|
shared_lane_queue_lengths.clone(),
|
||||||
closed_connection_rx,
|
closed_connection_rx,
|
||||||
shutdown.subscribe(),
|
shutdown.subscribe(),
|
||||||
);
|
);
|
||||||
@@ -471,6 +473,7 @@ impl NymClient {
|
|||||||
SocketType::WebSocket => self.start_websocket_listener(
|
SocketType::WebSocket => self.start_websocket_listener(
|
||||||
received_buffer_request_sender,
|
received_buffer_request_sender,
|
||||||
input_sender,
|
input_sender,
|
||||||
|
shared_lane_queue_lengths,
|
||||||
closed_connection_tx,
|
closed_connection_tx,
|
||||||
),
|
),
|
||||||
SocketType::None => {
|
SocketType::None => {
|
||||||
|
|||||||
@@ -1,7 +1,9 @@
|
|||||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||||
// SPDX-License-Identifier: Apache-2.0
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
use client_connections::{ClosedConnectionSender, TransmissionLane};
|
use client_connections::{
|
||||||
|
ClosedConnectionSender, ConnectionCommand, LaneQueueLengths, TransmissionLane,
|
||||||
|
};
|
||||||
use client_core::client::{
|
use client_core::client::{
|
||||||
inbound_messages::{InputMessage, InputMessageSender},
|
inbound_messages::{InputMessage, InputMessageSender},
|
||||||
received_buffer::{
|
received_buffer::{
|
||||||
@@ -40,6 +42,7 @@ pub(crate) struct Handler {
|
|||||||
self_full_address: Recipient,
|
self_full_address: Recipient,
|
||||||
socket: Option<WebSocketStream<TcpStream>>,
|
socket: Option<WebSocketStream<TcpStream>>,
|
||||||
received_response_type: ReceivedResponseType,
|
received_response_type: ReceivedResponseType,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
}
|
}
|
||||||
|
|
||||||
// clone is used to use handler on a new connection, which initially is `None`
|
// clone is used to use handler on a new connection, which initially is `None`
|
||||||
@@ -52,6 +55,7 @@ impl Clone for Handler {
|
|||||||
self_full_address: self.self_full_address,
|
self_full_address: self.self_full_address,
|
||||||
socket: None,
|
socket: None,
|
||||||
received_response_type: Default::default(),
|
received_response_type: Default::default(),
|
||||||
|
lane_queue_lengths: self.lane_queue_lengths.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -70,6 +74,7 @@ impl Handler {
|
|||||||
closed_connection_tx: ClosedConnectionSender,
|
closed_connection_tx: ClosedConnectionSender,
|
||||||
buffer_requester: ReceivedBufferRequestSender,
|
buffer_requester: ReceivedBufferRequestSender,
|
||||||
self_full_address: &Recipient,
|
self_full_address: &Recipient,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Handler {
|
Handler {
|
||||||
msg_input,
|
msg_input,
|
||||||
@@ -78,6 +83,7 @@ impl Handler {
|
|||||||
self_full_address: *self_full_address,
|
self_full_address: *self_full_address,
|
||||||
socket: None,
|
socket: None,
|
||||||
received_response_type: Default::default(),
|
received_response_type: Default::default(),
|
||||||
|
lane_queue_lengths,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -95,6 +101,15 @@ impl Handler {
|
|||||||
panic!();
|
panic!();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// on receiving a send, we reply back the current lane queue length for that connection id.
|
||||||
|
// Note that this does _NOT_ take into account the packets that have been received but not
|
||||||
|
// yet reach `OutQueueControl`, so it might be a tad low.
|
||||||
|
if let Ok(lane_queue_lengths) = self.lane_queue_lengths.lock() {
|
||||||
|
let queue_length = lane_queue_lengths.get(&lane).unwrap_or(0);
|
||||||
|
return Some(ServerResponse::LaneQueueLength(connection_id, queue_length));
|
||||||
|
}
|
||||||
|
|
||||||
|
log::warn!("Failed to get the lane queue length lock, not responding back with the current queue length");
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -121,11 +136,24 @@ impl Handler {
|
|||||||
|
|
||||||
fn handle_closed_connection(&self, connection_id: u64) -> Option<ServerResponse> {
|
fn handle_closed_connection(&self, connection_id: u64) -> Option<ServerResponse> {
|
||||||
self.closed_connection_tx
|
self.closed_connection_tx
|
||||||
.unbounded_send(connection_id)
|
.unbounded_send(ConnectionCommand::Close(connection_id))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn handle_get_lane_queue_length(&self, connection_id: u64) -> Option<ServerResponse> {
|
||||||
|
let Ok(lane_queue_lengths) = self.lane_queue_lengths.lock() else {
|
||||||
|
log::warn!(
|
||||||
|
"Failed to get the lane queue length lock, not responding back with the current queue length"
|
||||||
|
);
|
||||||
|
return None;
|
||||||
|
};
|
||||||
|
|
||||||
|
let lane = TransmissionLane::ConnectionId(connection_id);
|
||||||
|
let queue_length = lane_queue_lengths.get(&lane).unwrap_or(0);
|
||||||
|
Some(ServerResponse::LaneQueueLength(connection_id, queue_length))
|
||||||
|
}
|
||||||
|
|
||||||
async fn handle_request(&mut self, request: ClientRequest) -> Option<ServerResponse> {
|
async fn handle_request(&mut self, request: ClientRequest) -> Option<ServerResponse> {
|
||||||
match request {
|
match request {
|
||||||
ClientRequest::Send {
|
ClientRequest::Send {
|
||||||
@@ -143,6 +171,7 @@ impl Handler {
|
|||||||
} => self.handle_reply(reply_surb, message).await,
|
} => self.handle_reply(reply_surb, message).await,
|
||||||
ClientRequest::SelfAddress => Some(self.handle_self_address()),
|
ClientRequest::SelfAddress => Some(self.handle_self_address()),
|
||||||
ClientRequest::ClosedConnection(id) => self.handle_closed_connection(id),
|
ClientRequest::ClosedConnection(id) => self.handle_closed_connection(id),
|
||||||
|
ClientRequest::GetLaneQueueLength(id) => self.handle_get_lane_queue_length(id),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -23,6 +23,9 @@ pub const SELF_ADDRESS_REQUEST_TAG: u8 = 0x02;
|
|||||||
/// Value tag representing [`ClosedConnection`] variant of the [`ClientRequest`]
|
/// Value tag representing [`ClosedConnection`] variant of the [`ClientRequest`]
|
||||||
pub const CLOSED_CONNECTION_REQUEST_TAG: u8 = 0x03;
|
pub const CLOSED_CONNECTION_REQUEST_TAG: u8 = 0x03;
|
||||||
|
|
||||||
|
/// Value tag representing [`GetLaneQueueLength`] variant of the [`ClientRequest`]
|
||||||
|
pub const GET_LANE_QUEUE_LENGHT_TAG: u8 = 0x04;
|
||||||
|
|
||||||
#[allow(non_snake_case)]
|
#[allow(non_snake_case)]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum ClientRequest {
|
pub enum ClientRequest {
|
||||||
@@ -39,6 +42,7 @@ pub enum ClientRequest {
|
|||||||
},
|
},
|
||||||
SelfAddress,
|
SelfAddress,
|
||||||
ClosedConnection(u64),
|
ClosedConnection(u64),
|
||||||
|
GetLaneQueueLength(u64),
|
||||||
}
|
}
|
||||||
|
|
||||||
// we could have been parsing it directly TryFrom<WsMessage>, but we want to retain
|
// we could have been parsing it directly TryFrom<WsMessage>, but we want to retain
|
||||||
@@ -241,6 +245,26 @@ impl ClientRequest {
|
|||||||
ClientRequest::ClosedConnection(connection_id)
|
ClientRequest::ClosedConnection(connection_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GET_LANE_QUEUE_LENGHT_TAG
|
||||||
|
fn serialize_get_lane_queue_lengths(connection_id: u64) -> Vec<u8> {
|
||||||
|
let conn_id_bytes = connection_id.to_be_bytes();
|
||||||
|
std::iter::once(GET_LANE_QUEUE_LENGHT_TAG)
|
||||||
|
.chain(conn_id_bytes.iter().copied())
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
// GET_LANE_QUEUE_LENGHT_TAG
|
||||||
|
fn deserialize_get_lane_queue_length(b: &[u8]) -> Self {
|
||||||
|
// this MUST match because it was called by 'deserialize'
|
||||||
|
debug_assert_eq!(b[0], GET_LANE_QUEUE_LENGHT_TAG);
|
||||||
|
|
||||||
|
let mut connection_id_bytes = [0u8; size_of::<u64>()];
|
||||||
|
connection_id_bytes.copy_from_slice(&b[1..=size_of::<u64>()]);
|
||||||
|
let connection_id = u64::from_be_bytes(connection_id_bytes);
|
||||||
|
|
||||||
|
ClientRequest::GetLaneQueueLength(connection_id)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn serialize(self) -> Vec<u8> {
|
pub fn serialize(self) -> Vec<u8> {
|
||||||
match self {
|
match self {
|
||||||
ClientRequest::Send {
|
ClientRequest::Send {
|
||||||
@@ -258,6 +282,8 @@ impl ClientRequest {
|
|||||||
ClientRequest::SelfAddress => Self::serialize_self_address(),
|
ClientRequest::SelfAddress => Self::serialize_self_address(),
|
||||||
|
|
||||||
ClientRequest::ClosedConnection(id) => Self::serialize_closed_connection(id),
|
ClientRequest::ClosedConnection(id) => Self::serialize_closed_connection(id),
|
||||||
|
|
||||||
|
ClientRequest::GetLaneQueueLength(id) => Self::serialize_get_lane_queue_lengths(id),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -288,6 +314,7 @@ impl ClientRequest {
|
|||||||
REPLY_REQUEST_TAG => Self::deserialize_reply(b),
|
REPLY_REQUEST_TAG => Self::deserialize_reply(b),
|
||||||
SELF_ADDRESS_REQUEST_TAG => Ok(Self::deserialize_self_address(b)),
|
SELF_ADDRESS_REQUEST_TAG => Ok(Self::deserialize_self_address(b)),
|
||||||
CLOSED_CONNECTION_REQUEST_TAG => Ok(Self::deserialize_closed_connection(b)),
|
CLOSED_CONNECTION_REQUEST_TAG => Ok(Self::deserialize_closed_connection(b)),
|
||||||
|
GET_LANE_QUEUE_LENGHT_TAG => Ok(Self::deserialize_get_lane_queue_length(b)),
|
||||||
n => Err(error::Error::new(
|
n => Err(error::Error::new(
|
||||||
ErrorKind::UnknownRequest,
|
ErrorKind::UnknownRequest,
|
||||||
format!("type {n}"),
|
format!("type {n}"),
|
||||||
|
|||||||
@@ -62,7 +62,7 @@ impl SphinxSocksServer {
|
|||||||
|
|
||||||
// controller for managing all active connections
|
// controller for managing all active connections
|
||||||
let (mut active_streams_controller, controller_sender) =
|
let (mut active_streams_controller, controller_sender) =
|
||||||
Controller::new(closed_connection_tx, self.shutdown.clone());
|
Controller::new(closed_connection_tx, false, self.shutdown.clone());
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
active_streams_controller.run().await;
|
active_streams_controller.run().await;
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -19,8 +19,20 @@ pub enum TransmissionLane {
|
|||||||
/// Announce connections that are closed, for whoever is interested.
|
/// Announce connections that are closed, for whoever is interested.
|
||||||
/// One usecase is that the network-requester and socks5-client wants to know about this, so that
|
/// One usecase is that the network-requester and socks5-client wants to know about this, so that
|
||||||
/// they can forward this to the `OutQueueControl` (via `ClientRequest` for the network-requester)
|
/// they can forward this to the `OutQueueControl` (via `ClientRequest` for the network-requester)
|
||||||
pub type ClosedConnectionSender = mpsc::UnboundedSender<ConnectionId>;
|
pub type ClosedConnectionSender = mpsc::UnboundedSender<ConnectionCommand>;
|
||||||
pub type ClosedConnectionReceiver = mpsc::UnboundedReceiver<ConnectionId>;
|
pub type ClosedConnectionReceiver = mpsc::UnboundedReceiver<ConnectionCommand>;
|
||||||
|
|
||||||
|
pub enum ConnectionCommand {
|
||||||
|
// Announce that at a connection was closed. E.g the `OutQueueControl` uses this to discard
|
||||||
|
// transmission lanes.
|
||||||
|
Close(ConnectionId),
|
||||||
|
|
||||||
|
// In the network requester for example, we usually want to broadcast active connections
|
||||||
|
// regularly, so we know what connections we need to request lane queue lengths for from the
|
||||||
|
// client.
|
||||||
|
// In the socks5-client, this is not needed since have direct access to the lane queue lengths.
|
||||||
|
ActiveConnections(Vec<ConnectionId>),
|
||||||
|
}
|
||||||
|
|
||||||
// The `OutQueueControl` publishes the backlog per lane, primarily so that upstream can slow down
|
// The `OutQueueControl` publishes the backlog per lane, primarily so that upstream can slow down
|
||||||
// if needed.
|
// if needed.
|
||||||
|
|||||||
@@ -1,14 +1,18 @@
|
|||||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||||
// SPDX-License-Identifier: Apache-2.0
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
use client_connections::ClosedConnectionSender;
|
use client_connections::{ClosedConnectionSender, ConnectionCommand};
|
||||||
use futures::channel::mpsc;
|
use futures::channel::mpsc;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use log::*;
|
use log::*;
|
||||||
use ordered_buffer::{OrderedMessage, OrderedMessageBuffer, ReadContiguousData};
|
use ordered_buffer::{OrderedMessage, OrderedMessageBuffer, ReadContiguousData};
|
||||||
use socks5_requests::ConnectionId;
|
use socks5_requests::ConnectionId;
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::{
|
||||||
|
collections::{HashMap, HashSet},
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
use task::ShutdownListener;
|
use task::ShutdownListener;
|
||||||
|
use tokio::time;
|
||||||
|
|
||||||
/// A generic message produced after reading from a socket/connection. It includes data that was
|
/// A generic message produced after reading from a socket/connection. It includes data that was
|
||||||
/// actually read alongside boolean indicating whether the connection got closed so that
|
/// actually read alongside boolean indicating whether the connection got closed so that
|
||||||
@@ -77,6 +81,8 @@ pub struct Controller {
|
|||||||
// Broadcast closed connections
|
// Broadcast closed connections
|
||||||
closed_connection_tx: ClosedConnectionSender,
|
closed_connection_tx: ClosedConnectionSender,
|
||||||
|
|
||||||
|
broadcast_connections: bool,
|
||||||
|
|
||||||
// TODO: this can potentially be abused to ddos and kill provider. Not sure at this point
|
// TODO: this can potentially be abused to ddos and kill provider. Not sure at this point
|
||||||
// how to handle it more gracefully
|
// how to handle it more gracefully
|
||||||
|
|
||||||
@@ -90,6 +96,7 @@ pub struct Controller {
|
|||||||
impl Controller {
|
impl Controller {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
closed_connection_tx: ClosedConnectionSender,
|
closed_connection_tx: ClosedConnectionSender,
|
||||||
|
broadcast_connections: bool,
|
||||||
shutdown: ShutdownListener,
|
shutdown: ShutdownListener,
|
||||||
) -> (Self, ControllerSender) {
|
) -> (Self, ControllerSender) {
|
||||||
let (sender, receiver) = mpsc::unbounded();
|
let (sender, receiver) = mpsc::unbounded();
|
||||||
@@ -99,6 +106,7 @@ impl Controller {
|
|||||||
receiver,
|
receiver,
|
||||||
recently_closed: HashSet::new(),
|
recently_closed: HashSet::new(),
|
||||||
closed_connection_tx,
|
closed_connection_tx,
|
||||||
|
broadcast_connections,
|
||||||
pending_messages: HashMap::new(),
|
pending_messages: HashMap::new(),
|
||||||
shutdown,
|
shutdown,
|
||||||
},
|
},
|
||||||
@@ -137,7 +145,18 @@ impl Controller {
|
|||||||
self.recently_closed.insert(conn_id);
|
self.recently_closed.insert(conn_id);
|
||||||
|
|
||||||
// Announce closed connections, currently used by the `OutQueueControl`.
|
// Announce closed connections, currently used by the `OutQueueControl`.
|
||||||
self.closed_connection_tx.unbounded_send(conn_id).unwrap();
|
self.closed_connection_tx
|
||||||
|
.unbounded_send(ConnectionCommand::Close(conn_id))
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn broadcast_active_connections(&mut self) {
|
||||||
|
// What about the recently closed ones? Hopefully we can ignore them ...
|
||||||
|
let conn_ids = self.active_connections.keys().copied().collect();
|
||||||
|
|
||||||
|
self.closed_connection_tx
|
||||||
|
.unbounded_send(ConnectionCommand::ActiveConnections(conn_ids))
|
||||||
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_to_connection(&mut self, conn_id: ConnectionId, payload: Vec<u8>, is_closed: bool) {
|
fn send_to_connection(&mut self, conn_id: ConnectionId, payload: Vec<u8>, is_closed: bool) {
|
||||||
@@ -196,6 +215,8 @@ impl Controller {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(&mut self) {
|
pub async fn run(&mut self) {
|
||||||
|
let mut interval = time::interval(Duration::from_millis(500));
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
command = self.receiver.next() => match command {
|
command = self.receiver.next() => match command {
|
||||||
@@ -210,7 +231,12 @@ impl Controller {
|
|||||||
log::trace!("SOCKS5 Controller: Stopping since channel closed");
|
log::trace!("SOCKS5 Controller: Stopping since channel closed");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
|
_ = interval.tick() => {
|
||||||
|
if self.broadcast_connections {
|
||||||
|
self.broadcast_active_connections();
|
||||||
|
}
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assert!(self.shutdown.is_shutdown_poll());
|
assert!(self.shutdown.is_shutdown_poll());
|
||||||
|
|||||||
@@ -106,23 +106,37 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn wait_until_lane_empty(lane_queue_lengths: LaneQueueLengths, connection_id: u64) {
|
async fn wait_until_lane_empty(lane_queue_lengths: LaneQueueLengths, connection_id: u64) {
|
||||||
wait_for_lane(
|
if tokio::time::timeout(
|
||||||
lane_queue_lengths,
|
Duration::from_secs(15),
|
||||||
connection_id,
|
wait_for_lane(
|
||||||
0,
|
lane_queue_lengths,
|
||||||
Duration::from_millis(500),
|
connection_id,
|
||||||
|
0,
|
||||||
|
Duration::from_millis(500),
|
||||||
|
),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
log::warn!("Wait until lane empty timed out");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn wait_until_lane_almost_empty(lane_queue_lengths: LaneQueueLengths, connection_id: u64) {
|
async fn wait_until_lane_almost_empty(lane_queue_lengths: LaneQueueLengths, connection_id: u64) {
|
||||||
wait_for_lane(
|
if tokio::time::timeout(
|
||||||
lane_queue_lengths,
|
Duration::from_secs(15),
|
||||||
connection_id,
|
wait_for_lane(
|
||||||
10,
|
lane_queue_lengths,
|
||||||
Duration::from_millis(100),
|
connection_id,
|
||||||
|
10,
|
||||||
|
Duration::from_millis(100),
|
||||||
|
),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
log::warn!("Wait until lane almost empty timed out");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn wait_for_lane(
|
async fn wait_for_lane(
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
// Copyright 2020 - Nym Technologies SA <contact@nymtech.net>
|
// Copyright 2020 - Nym Technologies SA <contact@nymtech.net>
|
||||||
// SPDX-License-Identifier: Apache-2.0
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
use client_connections::LaneQueueLengths;
|
||||||
use nymsphinx::addressing::clients::Recipient;
|
use nymsphinx::addressing::clients::Recipient;
|
||||||
use proxy_helpers::connection_controller::ConnectionReceiver;
|
use proxy_helpers::connection_controller::ConnectionReceiver;
|
||||||
use proxy_helpers::proxy_runner::{MixProxySender, ProxyRunner};
|
use proxy_helpers::proxy_runner::{MixProxySender, ProxyRunner};
|
||||||
@@ -40,6 +41,7 @@ impl Connection {
|
|||||||
&mut self,
|
&mut self,
|
||||||
mix_receiver: ConnectionReceiver,
|
mix_receiver: ConnectionReceiver,
|
||||||
mix_sender: MixProxySender<(Socks5Message, Recipient)>,
|
mix_sender: MixProxySender<(Socks5Message, Recipient)>,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
shutdown: ShutdownListener,
|
shutdown: ShutdownListener,
|
||||||
) {
|
) {
|
||||||
let stream = self.conn.take().unwrap();
|
let stream = self.conn.take().unwrap();
|
||||||
@@ -53,7 +55,7 @@ impl Connection {
|
|||||||
mix_receiver,
|
mix_receiver,
|
||||||
mix_sender,
|
mix_sender,
|
||||||
connection_id,
|
connection_id,
|
||||||
None,
|
Some(lane_queue_lengths),
|
||||||
shutdown,
|
shutdown,
|
||||||
)
|
)
|
||||||
.run(move |conn_id, read_data, socket_closed| {
|
.run(move |conn_id, read_data, socket_closed| {
|
||||||
|
|||||||
@@ -7,7 +7,9 @@ use crate::error::NetworkRequesterError;
|
|||||||
use crate::statistics::ServiceStatisticsCollector;
|
use crate::statistics::ServiceStatisticsCollector;
|
||||||
use crate::websocket;
|
use crate::websocket;
|
||||||
use crate::websocket::TSWebsocketStream;
|
use crate::websocket::TSWebsocketStream;
|
||||||
use client_connections::ClosedConnectionReceiver;
|
use client_connections::{
|
||||||
|
ClosedConnectionReceiver, ConnectionCommand, LaneQueueLengths, TransmissionLane,
|
||||||
|
};
|
||||||
use futures::channel::mpsc;
|
use futures::channel::mpsc;
|
||||||
use futures::stream::{SplitSink, SplitStream};
|
use futures::stream::{SplitSink, SplitStream};
|
||||||
use futures::{SinkExt, StreamExt};
|
use futures::{SinkExt, StreamExt};
|
||||||
@@ -108,17 +110,49 @@ impl ServiceProvider {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Some(id) = closed_connection_rx.next() => {
|
Some(command) = closed_connection_rx.next() => {
|
||||||
let msg = ClientRequest::ClosedConnection(id);
|
match command {
|
||||||
let ws_msg = Message::Binary(msg.serialize());
|
ConnectionCommand::Close(id) => {
|
||||||
websocket_writer.send(ws_msg).await.unwrap();
|
let msg = ClientRequest::ClosedConnection(id);
|
||||||
}
|
let ws_msg = Message::Binary(msg.serialize());
|
||||||
|
websocket_writer.send(ws_msg).await.unwrap();
|
||||||
|
}
|
||||||
|
ConnectionCommand::ActiveConnections(ids) => {
|
||||||
|
// We can optimize this by sending a single request, but this is
|
||||||
|
// usually in the low single digits, max a few tens, so we leave that
|
||||||
|
// for a rainy day.
|
||||||
|
// Also that means fiddling with the currently manual
|
||||||
|
// serialize/deserialize we do with ClientRequests ... bleh
|
||||||
|
for id in ids {
|
||||||
|
log::info!("Requesting lane queue length for: {}", id);
|
||||||
|
let msg = ClientRequest::GetLaneQueueLength(id);
|
||||||
|
let ws_msg = Message::Binary(msg.serialize());
|
||||||
|
websocket_writer.send(ws_msg).await.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn handle_lane_queue_length_response(
|
||||||
|
lane_queue_lengths: &LaneQueueLengths,
|
||||||
|
lane: u64,
|
||||||
|
queue_length: usize,
|
||||||
|
) {
|
||||||
|
log::info!("received LaneQueueLength lane: {lane}, queue_length: {queue_length}");
|
||||||
|
if let Ok(mut lane_queue_lengths) = lane_queue_lengths.lock() {
|
||||||
|
let lane = TransmissionLane::ConnectionId(lane);
|
||||||
|
lane_queue_lengths.map.insert(lane, queue_length);
|
||||||
|
} else {
|
||||||
|
log::warn!("Unable to lock lane queue lengths, skipping updating received lane length")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn read_websocket_message(
|
async fn read_websocket_message(
|
||||||
websocket_reader: &mut SplitStream<TSWebsocketStream>,
|
websocket_reader: &mut SplitStream<TSWebsocketStream>,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
) -> Option<ReconstructedMessage> {
|
) -> Option<ReconstructedMessage> {
|
||||||
while let Some(msg) = websocket_reader.next().await {
|
while let Some(msg) = websocket_reader.next().await {
|
||||||
let data = msg
|
let data = msg
|
||||||
@@ -139,6 +173,14 @@ impl ServiceProvider {
|
|||||||
|
|
||||||
let received = match deserialized_message {
|
let received = match deserialized_message {
|
||||||
ServerResponse::Received(received) => received,
|
ServerResponse::Received(received) => received,
|
||||||
|
ServerResponse::LaneQueueLength(lane, queue_length) => {
|
||||||
|
Self::handle_lane_queue_length_response(
|
||||||
|
&lane_queue_lengths,
|
||||||
|
lane,
|
||||||
|
queue_length,
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
ServerResponse::Error(err) => {
|
ServerResponse::Error(err) => {
|
||||||
panic!("received error from native client! - {}", err)
|
panic!("received error from native client! - {}", err)
|
||||||
}
|
}
|
||||||
@@ -155,6 +197,7 @@ impl ServiceProvider {
|
|||||||
return_address: Recipient,
|
return_address: Recipient,
|
||||||
controller_sender: ControllerSender,
|
controller_sender: ControllerSender,
|
||||||
mix_input_sender: MixProxySender<(Socks5Message, Recipient)>,
|
mix_input_sender: MixProxySender<(Socks5Message, Recipient)>,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
shutdown: ShutdownListener,
|
shutdown: ShutdownListener,
|
||||||
) {
|
) {
|
||||||
let mut conn = match Connection::new(conn_id, remote_addr.clone(), return_address).await {
|
let mut conn = match Connection::new(conn_id, remote_addr.clone(), return_address).await {
|
||||||
@@ -196,7 +239,7 @@ impl ServiceProvider {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// run the proxy on the connection
|
// run the proxy on the connection
|
||||||
conn.run_proxy(mix_receiver, mix_input_sender, shutdown)
|
conn.run_proxy(mix_receiver, mix_input_sender, lane_queue_lengths, shutdown)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
// proxy is done - remove the access channel from the controller
|
// proxy is done - remove the access channel from the controller
|
||||||
@@ -212,10 +255,12 @@ impl ServiceProvider {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
async fn handle_proxy_connect(
|
async fn handle_proxy_connect(
|
||||||
&mut self,
|
&mut self,
|
||||||
controller_sender: &mut ControllerSender,
|
controller_sender: &mut ControllerSender,
|
||||||
mix_input_sender: &MixProxySender<(Socks5Message, Recipient)>,
|
mix_input_sender: &MixProxySender<(Socks5Message, Recipient)>,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
conn_id: ConnectionId,
|
conn_id: ConnectionId,
|
||||||
remote_addr: String,
|
remote_addr: String,
|
||||||
return_address: Recipient,
|
return_address: Recipient,
|
||||||
@@ -250,6 +295,7 @@ impl ServiceProvider {
|
|||||||
return_address,
|
return_address,
|
||||||
controller_sender_clone,
|
controller_sender_clone,
|
||||||
mix_input_sender_clone,
|
mix_input_sender_clone,
|
||||||
|
lane_queue_lengths,
|
||||||
shutdown,
|
shutdown,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
@@ -257,7 +303,6 @@ impl ServiceProvider {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn handle_proxy_send(
|
fn handle_proxy_send(
|
||||||
&self,
|
|
||||||
controller_sender: &mut ControllerSender,
|
controller_sender: &mut ControllerSender,
|
||||||
conn_id: ConnectionId,
|
conn_id: ConnectionId,
|
||||||
data: Vec<u8>,
|
data: Vec<u8>,
|
||||||
@@ -273,6 +318,7 @@ impl ServiceProvider {
|
|||||||
raw_request: &[u8],
|
raw_request: &[u8],
|
||||||
controller_sender: &mut ControllerSender,
|
controller_sender: &mut ControllerSender,
|
||||||
mix_input_sender: &MixProxySender<(Socks5Message, Recipient)>,
|
mix_input_sender: &MixProxySender<(Socks5Message, Recipient)>,
|
||||||
|
lane_queue_lengths: LaneQueueLengths,
|
||||||
stats_collector: Option<ServiceStatisticsCollector>,
|
stats_collector: Option<ServiceStatisticsCollector>,
|
||||||
shutdown: ShutdownListener,
|
shutdown: ShutdownListener,
|
||||||
) {
|
) {
|
||||||
@@ -296,6 +342,7 @@ impl ServiceProvider {
|
|||||||
self.handle_proxy_connect(
|
self.handle_proxy_connect(
|
||||||
controller_sender,
|
controller_sender,
|
||||||
mix_input_sender,
|
mix_input_sender,
|
||||||
|
lane_queue_lengths,
|
||||||
req.conn_id,
|
req.conn_id,
|
||||||
req.remote_addr,
|
req.remote_addr,
|
||||||
req.return_address,
|
req.return_address,
|
||||||
@@ -319,7 +366,7 @@ impl ServiceProvider {
|
|||||||
.processed(remote_addr, data.len() as u32);
|
.processed(remote_addr, data.len() as u32);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.handle_proxy_send(controller_sender, conn_id, data, closed)
|
Self::handle_proxy_send(controller_sender, conn_id, data, closed)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Socks5Message::Response(_) | Socks5Message::NetworkRequesterResponse(_) => {}
|
Socks5Message::Response(_) | Socks5Message::NetworkRequesterResponse(_) => {}
|
||||||
@@ -346,11 +393,15 @@ impl ServiceProvider {
|
|||||||
// `ClientRequest`.
|
// `ClientRequest`.
|
||||||
let (closed_connection_tx, closed_connection_rx) = mpsc::unbounded();
|
let (closed_connection_tx, closed_connection_rx) = mpsc::unbounded();
|
||||||
|
|
||||||
|
// Shared queue length data. Published by the `OutQueueController` in the client, and used
|
||||||
|
// primarily to throttle incoming connections
|
||||||
|
let shared_lane_queue_lengths = LaneQueueLengths::new();
|
||||||
|
|
||||||
// Controller for managing all active connections.
|
// Controller for managing all active connections.
|
||||||
// We provide it with a ShutdownListener since it requires it, even though for the network
|
// We provide it with a ShutdownListener since it requires it, even though for the network
|
||||||
// requester shutdown signalling is not yet fully implemented.
|
// requester shutdown signalling is not yet fully implemented.
|
||||||
let (mut active_connections_controller, mut controller_sender) =
|
let (mut active_connections_controller, mut controller_sender) =
|
||||||
Controller::new(closed_connection_tx, shutdown.subscribe());
|
Controller::new(closed_connection_tx, true, shutdown.subscribe());
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
active_connections_controller.run().await;
|
active_connections_controller.run().await;
|
||||||
@@ -386,12 +437,14 @@ impl ServiceProvider {
|
|||||||
println!("\nAll systems go. Press CTRL-C to stop the server.");
|
println!("\nAll systems go. Press CTRL-C to stop the server.");
|
||||||
// for each incoming message from the websocket... (which in 99.99% cases is going to be a mix message)
|
// for each incoming message from the websocket... (which in 99.99% cases is going to be a mix message)
|
||||||
loop {
|
loop {
|
||||||
let received = match Self::read_websocket_message(&mut websocket_reader).await {
|
let Some(received) = Self::read_websocket_message(
|
||||||
Some(msg) => msg,
|
&mut websocket_reader,
|
||||||
None => {
|
shared_lane_queue_lengths.clone()
|
||||||
error!("The websocket stream has finished!");
|
)
|
||||||
return Ok(());
|
.await
|
||||||
}
|
else {
|
||||||
|
log::error!("The websocket stream has finished!");
|
||||||
|
return Ok(());
|
||||||
};
|
};
|
||||||
|
|
||||||
let raw_message = received.message;
|
let raw_message = received.message;
|
||||||
@@ -401,6 +454,7 @@ impl ServiceProvider {
|
|||||||
&raw_message,
|
&raw_message,
|
||||||
&mut controller_sender,
|
&mut controller_sender,
|
||||||
&mix_input_sender,
|
&mix_input_sender,
|
||||||
|
shared_lane_queue_lengths.clone(),
|
||||||
stats_collector.clone(),
|
stats_collector.clone(),
|
||||||
shutdown.subscribe(),
|
shutdown.subscribe(),
|
||||||
)
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user