Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c61eb014c5 | |||
| c10f788a89 | |||
| 73fa863a6a | |||
| a561f1b854 | |||
| 1c379fa0db | |||
| 31778c6e51 | |||
| 9d0562168e | |||
| 701f603825 | |||
| 97b250433e |
@@ -5,7 +5,7 @@ use crate::client::mix_traffic::BatchMixMessageSender;
|
||||
use crate::client::real_messages_control::acknowledgement_control::SentPacketNotificationSender;
|
||||
use crate::client::topology_control::TopologyAccessor;
|
||||
use client_connections::{
|
||||
ClosedConnectionReceiver, ConnectionId, LaneQueueLengths, TransmissionLane,
|
||||
ClosedConnectionReceiver, ConnectionCommand, ConnectionId, LaneQueueLengths, TransmissionLane,
|
||||
};
|
||||
use futures::task::{Context, Poll};
|
||||
use futures::{Future, Stream, StreamExt};
|
||||
@@ -336,7 +336,10 @@ where
|
||||
// NOTE: this feels a bit iffy, the `OutQueueControl` is getting ripe for a rewrite to
|
||||
// something simpler.
|
||||
if let Poll::Ready(Some(id)) = Pin::new(&mut self.closed_connection_rx).poll_next(cx) {
|
||||
self.on_close_connection(id);
|
||||
match id {
|
||||
ConnectionCommand::Close(id) => self.on_close_connection(id),
|
||||
ConnectionCommand::ActiveConnections(_) => panic!(),
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(ref mut next_delay) = &mut self.next_delay {
|
||||
@@ -412,7 +415,10 @@ where
|
||||
fn poll_immediate(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
|
||||
// Start by checking if we have any incoming messages about closed connections
|
||||
if let Poll::Ready(Some(id)) = Pin::new(&mut self.closed_connection_rx).poll_next(cx) {
|
||||
self.on_close_connection(id);
|
||||
match id {
|
||||
ConnectionCommand::Close(id) => self.on_close_connection(id),
|
||||
ConnectionCommand::ActiveConnections(_) => panic!(),
|
||||
}
|
||||
}
|
||||
|
||||
match Pin::new(&mut self.real_receiver).poll_recv(cx) {
|
||||
|
||||
@@ -288,6 +288,7 @@ impl NymClient {
|
||||
&self,
|
||||
buffer_requester: ReceivedBufferRequestSender,
|
||||
msg_input: InputMessageSender,
|
||||
shared_lane_queue_lengths: LaneQueueLengths,
|
||||
closed_connection_tx: ClosedConnectionSender,
|
||||
) {
|
||||
info!("Starting websocket listener...");
|
||||
@@ -297,6 +298,7 @@ impl NymClient {
|
||||
closed_connection_tx,
|
||||
buffer_requester,
|
||||
&self.as_mix_recipient(),
|
||||
shared_lane_queue_lengths,
|
||||
);
|
||||
|
||||
websocket::Listener::new(self.config.get_listening_port()).start(websocket_handler);
|
||||
@@ -442,7 +444,7 @@ impl NymClient {
|
||||
|
||||
// Shared queue length data. Published by the `OutQueueController` in the client, and used
|
||||
// primarily to throttle incoming connections (e.g socks5 for attached network-requesters)
|
||||
let shared_lane_queue_length = LaneQueueLengths::new();
|
||||
let shared_lane_queue_lengths = LaneQueueLengths::new();
|
||||
|
||||
self.start_real_traffic_controller(
|
||||
shared_topology_accessor.clone(),
|
||||
@@ -450,7 +452,7 @@ impl NymClient {
|
||||
ack_receiver,
|
||||
input_receiver,
|
||||
sphinx_message_sender.clone(),
|
||||
shared_lane_queue_length,
|
||||
shared_lane_queue_lengths.clone(),
|
||||
closed_connection_rx,
|
||||
shutdown.subscribe(),
|
||||
);
|
||||
@@ -471,6 +473,7 @@ impl NymClient {
|
||||
SocketType::WebSocket => self.start_websocket_listener(
|
||||
received_buffer_request_sender,
|
||||
input_sender,
|
||||
shared_lane_queue_lengths,
|
||||
closed_connection_tx,
|
||||
),
|
||||
SocketType::None => {
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use client_connections::{ClosedConnectionSender, TransmissionLane};
|
||||
use client_connections::{
|
||||
ClosedConnectionSender, ConnectionCommand, LaneQueueLengths, TransmissionLane,
|
||||
};
|
||||
use client_core::client::{
|
||||
inbound_messages::{InputMessage, InputMessageSender},
|
||||
received_buffer::{
|
||||
@@ -40,6 +42,7 @@ pub(crate) struct Handler {
|
||||
self_full_address: Recipient,
|
||||
socket: Option<WebSocketStream<TcpStream>>,
|
||||
received_response_type: ReceivedResponseType,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
}
|
||||
|
||||
// clone is used to use handler on a new connection, which initially is `None`
|
||||
@@ -52,6 +55,7 @@ impl Clone for Handler {
|
||||
self_full_address: self.self_full_address,
|
||||
socket: None,
|
||||
received_response_type: Default::default(),
|
||||
lane_queue_lengths: self.lane_queue_lengths.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -70,6 +74,7 @@ impl Handler {
|
||||
closed_connection_tx: ClosedConnectionSender,
|
||||
buffer_requester: ReceivedBufferRequestSender,
|
||||
self_full_address: &Recipient,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
) -> Self {
|
||||
Handler {
|
||||
msg_input,
|
||||
@@ -78,6 +83,7 @@ impl Handler {
|
||||
self_full_address: *self_full_address,
|
||||
socket: None,
|
||||
received_response_type: Default::default(),
|
||||
lane_queue_lengths,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -95,6 +101,15 @@ impl Handler {
|
||||
panic!();
|
||||
}
|
||||
|
||||
// on receiving a send, we reply back the current lane queue length for that connection id.
|
||||
// Note that this does _NOT_ take into account the packets that have been received but not
|
||||
// yet reach `OutQueueControl`, so it might be a tad low.
|
||||
if let Ok(lane_queue_lengths) = self.lane_queue_lengths.lock() {
|
||||
let queue_length = lane_queue_lengths.get(&lane).unwrap_or(0);
|
||||
return Some(ServerResponse::LaneQueueLength(connection_id, queue_length));
|
||||
}
|
||||
|
||||
log::warn!("Failed to get the lane queue length lock, not responding back with the current queue length");
|
||||
None
|
||||
}
|
||||
|
||||
@@ -121,11 +136,24 @@ impl Handler {
|
||||
|
||||
fn handle_closed_connection(&self, connection_id: u64) -> Option<ServerResponse> {
|
||||
self.closed_connection_tx
|
||||
.unbounded_send(connection_id)
|
||||
.unbounded_send(ConnectionCommand::Close(connection_id))
|
||||
.unwrap();
|
||||
None
|
||||
}
|
||||
|
||||
fn handle_get_lane_queue_length(&self, connection_id: u64) -> Option<ServerResponse> {
|
||||
let Ok(lane_queue_lengths) = self.lane_queue_lengths.lock() else {
|
||||
log::warn!(
|
||||
"Failed to get the lane queue length lock, not responding back with the current queue length"
|
||||
);
|
||||
return None;
|
||||
};
|
||||
|
||||
let lane = TransmissionLane::ConnectionId(connection_id);
|
||||
let queue_length = lane_queue_lengths.get(&lane).unwrap_or(0);
|
||||
Some(ServerResponse::LaneQueueLength(connection_id, queue_length))
|
||||
}
|
||||
|
||||
async fn handle_request(&mut self, request: ClientRequest) -> Option<ServerResponse> {
|
||||
match request {
|
||||
ClientRequest::Send {
|
||||
@@ -143,6 +171,7 @@ impl Handler {
|
||||
} => self.handle_reply(reply_surb, message).await,
|
||||
ClientRequest::SelfAddress => Some(self.handle_self_address()),
|
||||
ClientRequest::ClosedConnection(id) => self.handle_closed_connection(id),
|
||||
ClientRequest::GetLaneQueueLength(id) => self.handle_get_lane_queue_length(id),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -23,6 +23,9 @@ pub const SELF_ADDRESS_REQUEST_TAG: u8 = 0x02;
|
||||
/// Value tag representing [`ClosedConnection`] variant of the [`ClientRequest`]
|
||||
pub const CLOSED_CONNECTION_REQUEST_TAG: u8 = 0x03;
|
||||
|
||||
/// Value tag representing [`GetLaneQueueLength`] variant of the [`ClientRequest`]
|
||||
pub const GET_LANE_QUEUE_LENGHT_TAG: u8 = 0x04;
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
#[derive(Debug)]
|
||||
pub enum ClientRequest {
|
||||
@@ -39,6 +42,7 @@ pub enum ClientRequest {
|
||||
},
|
||||
SelfAddress,
|
||||
ClosedConnection(u64),
|
||||
GetLaneQueueLength(u64),
|
||||
}
|
||||
|
||||
// we could have been parsing it directly TryFrom<WsMessage>, but we want to retain
|
||||
@@ -241,6 +245,26 @@ impl ClientRequest {
|
||||
ClientRequest::ClosedConnection(connection_id)
|
||||
}
|
||||
|
||||
// GET_LANE_QUEUE_LENGHT_TAG
|
||||
fn serialize_get_lane_queue_lengths(connection_id: u64) -> Vec<u8> {
|
||||
let conn_id_bytes = connection_id.to_be_bytes();
|
||||
std::iter::once(GET_LANE_QUEUE_LENGHT_TAG)
|
||||
.chain(conn_id_bytes.iter().copied())
|
||||
.collect()
|
||||
}
|
||||
|
||||
// GET_LANE_QUEUE_LENGHT_TAG
|
||||
fn deserialize_get_lane_queue_length(b: &[u8]) -> Self {
|
||||
// this MUST match because it was called by 'deserialize'
|
||||
debug_assert_eq!(b[0], GET_LANE_QUEUE_LENGHT_TAG);
|
||||
|
||||
let mut connection_id_bytes = [0u8; size_of::<u64>()];
|
||||
connection_id_bytes.copy_from_slice(&b[1..=size_of::<u64>()]);
|
||||
let connection_id = u64::from_be_bytes(connection_id_bytes);
|
||||
|
||||
ClientRequest::GetLaneQueueLength(connection_id)
|
||||
}
|
||||
|
||||
pub fn serialize(self) -> Vec<u8> {
|
||||
match self {
|
||||
ClientRequest::Send {
|
||||
@@ -258,6 +282,8 @@ impl ClientRequest {
|
||||
ClientRequest::SelfAddress => Self::serialize_self_address(),
|
||||
|
||||
ClientRequest::ClosedConnection(id) => Self::serialize_closed_connection(id),
|
||||
|
||||
ClientRequest::GetLaneQueueLength(id) => Self::serialize_get_lane_queue_lengths(id),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -288,6 +314,7 @@ impl ClientRequest {
|
||||
REPLY_REQUEST_TAG => Self::deserialize_reply(b),
|
||||
SELF_ADDRESS_REQUEST_TAG => Ok(Self::deserialize_self_address(b)),
|
||||
CLOSED_CONNECTION_REQUEST_TAG => Ok(Self::deserialize_closed_connection(b)),
|
||||
GET_LANE_QUEUE_LENGHT_TAG => Ok(Self::deserialize_get_lane_queue_length(b)),
|
||||
n => Err(error::Error::new(
|
||||
ErrorKind::UnknownRequest,
|
||||
format!("type {n}"),
|
||||
|
||||
@@ -62,7 +62,7 @@ impl SphinxSocksServer {
|
||||
|
||||
// controller for managing all active connections
|
||||
let (mut active_streams_controller, controller_sender) =
|
||||
Controller::new(closed_connection_tx, self.shutdown.clone());
|
||||
Controller::new(closed_connection_tx, false, self.shutdown.clone());
|
||||
tokio::spawn(async move {
|
||||
active_streams_controller.run().await;
|
||||
});
|
||||
|
||||
@@ -19,8 +19,20 @@ pub enum TransmissionLane {
|
||||
/// Announce connections that are closed, for whoever is interested.
|
||||
/// One usecase is that the network-requester and socks5-client wants to know about this, so that
|
||||
/// they can forward this to the `OutQueueControl` (via `ClientRequest` for the network-requester)
|
||||
pub type ClosedConnectionSender = mpsc::UnboundedSender<ConnectionId>;
|
||||
pub type ClosedConnectionReceiver = mpsc::UnboundedReceiver<ConnectionId>;
|
||||
pub type ClosedConnectionSender = mpsc::UnboundedSender<ConnectionCommand>;
|
||||
pub type ClosedConnectionReceiver = mpsc::UnboundedReceiver<ConnectionCommand>;
|
||||
|
||||
pub enum ConnectionCommand {
|
||||
// Announce that at a connection was closed. E.g the `OutQueueControl` uses this to discard
|
||||
// transmission lanes.
|
||||
Close(ConnectionId),
|
||||
|
||||
// In the network requester for example, we usually want to broadcast active connections
|
||||
// regularly, so we know what connections we need to request lane queue lengths for from the
|
||||
// client.
|
||||
// In the socks5-client, this is not needed since have direct access to the lane queue lengths.
|
||||
ActiveConnections(Vec<ConnectionId>),
|
||||
}
|
||||
|
||||
// The `OutQueueControl` publishes the backlog per lane, primarily so that upstream can slow down
|
||||
// if needed.
|
||||
|
||||
@@ -1,14 +1,18 @@
|
||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use client_connections::ClosedConnectionSender;
|
||||
use client_connections::{ClosedConnectionSender, ConnectionCommand};
|
||||
use futures::channel::mpsc;
|
||||
use futures::StreamExt;
|
||||
use log::*;
|
||||
use ordered_buffer::{OrderedMessage, OrderedMessageBuffer, ReadContiguousData};
|
||||
use socks5_requests::ConnectionId;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
time::Duration,
|
||||
};
|
||||
use task::ShutdownListener;
|
||||
use tokio::time;
|
||||
|
||||
/// A generic message produced after reading from a socket/connection. It includes data that was
|
||||
/// actually read alongside boolean indicating whether the connection got closed so that
|
||||
@@ -77,6 +81,8 @@ pub struct Controller {
|
||||
// Broadcast closed connections
|
||||
closed_connection_tx: ClosedConnectionSender,
|
||||
|
||||
broadcast_connections: bool,
|
||||
|
||||
// TODO: this can potentially be abused to ddos and kill provider. Not sure at this point
|
||||
// how to handle it more gracefully
|
||||
|
||||
@@ -90,6 +96,7 @@ pub struct Controller {
|
||||
impl Controller {
|
||||
pub fn new(
|
||||
closed_connection_tx: ClosedConnectionSender,
|
||||
broadcast_connections: bool,
|
||||
shutdown: ShutdownListener,
|
||||
) -> (Self, ControllerSender) {
|
||||
let (sender, receiver) = mpsc::unbounded();
|
||||
@@ -99,6 +106,7 @@ impl Controller {
|
||||
receiver,
|
||||
recently_closed: HashSet::new(),
|
||||
closed_connection_tx,
|
||||
broadcast_connections,
|
||||
pending_messages: HashMap::new(),
|
||||
shutdown,
|
||||
},
|
||||
@@ -137,7 +145,18 @@ impl Controller {
|
||||
self.recently_closed.insert(conn_id);
|
||||
|
||||
// Announce closed connections, currently used by the `OutQueueControl`.
|
||||
self.closed_connection_tx.unbounded_send(conn_id).unwrap();
|
||||
self.closed_connection_tx
|
||||
.unbounded_send(ConnectionCommand::Close(conn_id))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn broadcast_active_connections(&mut self) {
|
||||
// What about the recently closed ones? Hopefully we can ignore them ...
|
||||
let conn_ids = self.active_connections.keys().copied().collect();
|
||||
|
||||
self.closed_connection_tx
|
||||
.unbounded_send(ConnectionCommand::ActiveConnections(conn_ids))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn send_to_connection(&mut self, conn_id: ConnectionId, payload: Vec<u8>, is_closed: bool) {
|
||||
@@ -196,6 +215,8 @@ impl Controller {
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) {
|
||||
let mut interval = time::interval(Duration::from_millis(500));
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
command = self.receiver.next() => match command {
|
||||
@@ -210,7 +231,12 @@ impl Controller {
|
||||
log::trace!("SOCKS5 Controller: Stopping since channel closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
_ = interval.tick() => {
|
||||
if self.broadcast_connections {
|
||||
self.broadcast_active_connections();
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
assert!(self.shutdown.is_shutdown_poll());
|
||||
|
||||
@@ -106,23 +106,37 @@ where
|
||||
}
|
||||
|
||||
async fn wait_until_lane_empty(lane_queue_lengths: LaneQueueLengths, connection_id: u64) {
|
||||
wait_for_lane(
|
||||
lane_queue_lengths,
|
||||
connection_id,
|
||||
0,
|
||||
Duration::from_millis(500),
|
||||
if tokio::time::timeout(
|
||||
Duration::from_secs(15),
|
||||
wait_for_lane(
|
||||
lane_queue_lengths,
|
||||
connection_id,
|
||||
0,
|
||||
Duration::from_millis(500),
|
||||
),
|
||||
)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
log::warn!("Wait until lane empty timed out");
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_until_lane_almost_empty(lane_queue_lengths: LaneQueueLengths, connection_id: u64) {
|
||||
wait_for_lane(
|
||||
lane_queue_lengths,
|
||||
connection_id,
|
||||
10,
|
||||
Duration::from_millis(100),
|
||||
if tokio::time::timeout(
|
||||
Duration::from_secs(15),
|
||||
wait_for_lane(
|
||||
lane_queue_lengths,
|
||||
connection_id,
|
||||
10,
|
||||
Duration::from_millis(100),
|
||||
),
|
||||
)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
log::warn!("Wait until lane almost empty timed out");
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_for_lane(
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
// Copyright 2020 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use client_connections::LaneQueueLengths;
|
||||
use nymsphinx::addressing::clients::Recipient;
|
||||
use proxy_helpers::connection_controller::ConnectionReceiver;
|
||||
use proxy_helpers::proxy_runner::{MixProxySender, ProxyRunner};
|
||||
@@ -40,6 +41,7 @@ impl Connection {
|
||||
&mut self,
|
||||
mix_receiver: ConnectionReceiver,
|
||||
mix_sender: MixProxySender<(Socks5Message, Recipient)>,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
let stream = self.conn.take().unwrap();
|
||||
@@ -53,7 +55,7 @@ impl Connection {
|
||||
mix_receiver,
|
||||
mix_sender,
|
||||
connection_id,
|
||||
None,
|
||||
Some(lane_queue_lengths),
|
||||
shutdown,
|
||||
)
|
||||
.run(move |conn_id, read_data, socket_closed| {
|
||||
|
||||
@@ -7,7 +7,9 @@ use crate::error::NetworkRequesterError;
|
||||
use crate::statistics::ServiceStatisticsCollector;
|
||||
use crate::websocket;
|
||||
use crate::websocket::TSWebsocketStream;
|
||||
use client_connections::ClosedConnectionReceiver;
|
||||
use client_connections::{
|
||||
ClosedConnectionReceiver, ConnectionCommand, LaneQueueLengths, TransmissionLane,
|
||||
};
|
||||
use futures::channel::mpsc;
|
||||
use futures::stream::{SplitSink, SplitStream};
|
||||
use futures::{SinkExt, StreamExt};
|
||||
@@ -108,17 +110,49 @@ impl ServiceProvider {
|
||||
break;
|
||||
}
|
||||
},
|
||||
Some(id) = closed_connection_rx.next() => {
|
||||
let msg = ClientRequest::ClosedConnection(id);
|
||||
let ws_msg = Message::Binary(msg.serialize());
|
||||
websocket_writer.send(ws_msg).await.unwrap();
|
||||
}
|
||||
Some(command) = closed_connection_rx.next() => {
|
||||
match command {
|
||||
ConnectionCommand::Close(id) => {
|
||||
let msg = ClientRequest::ClosedConnection(id);
|
||||
let ws_msg = Message::Binary(msg.serialize());
|
||||
websocket_writer.send(ws_msg).await.unwrap();
|
||||
}
|
||||
ConnectionCommand::ActiveConnections(ids) => {
|
||||
// We can optimize this by sending a single request, but this is
|
||||
// usually in the low single digits, max a few tens, so we leave that
|
||||
// for a rainy day.
|
||||
// Also that means fiddling with the currently manual
|
||||
// serialize/deserialize we do with ClientRequests ... bleh
|
||||
for id in ids {
|
||||
log::info!("Requesting lane queue length for: {}", id);
|
||||
let msg = ClientRequest::GetLaneQueueLength(id);
|
||||
let ws_msg = Message::Binary(msg.serialize());
|
||||
websocket_writer.send(ws_msg).await.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_lane_queue_length_response(
|
||||
lane_queue_lengths: &LaneQueueLengths,
|
||||
lane: u64,
|
||||
queue_length: usize,
|
||||
) {
|
||||
log::info!("received LaneQueueLength lane: {lane}, queue_length: {queue_length}");
|
||||
if let Ok(mut lane_queue_lengths) = lane_queue_lengths.lock() {
|
||||
let lane = TransmissionLane::ConnectionId(lane);
|
||||
lane_queue_lengths.map.insert(lane, queue_length);
|
||||
} else {
|
||||
log::warn!("Unable to lock lane queue lengths, skipping updating received lane length")
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_websocket_message(
|
||||
websocket_reader: &mut SplitStream<TSWebsocketStream>,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
) -> Option<ReconstructedMessage> {
|
||||
while let Some(msg) = websocket_reader.next().await {
|
||||
let data = msg
|
||||
@@ -139,6 +173,14 @@ impl ServiceProvider {
|
||||
|
||||
let received = match deserialized_message {
|
||||
ServerResponse::Received(received) => received,
|
||||
ServerResponse::LaneQueueLength(lane, queue_length) => {
|
||||
Self::handle_lane_queue_length_response(
|
||||
&lane_queue_lengths,
|
||||
lane,
|
||||
queue_length,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
ServerResponse::Error(err) => {
|
||||
panic!("received error from native client! - {}", err)
|
||||
}
|
||||
@@ -155,6 +197,7 @@ impl ServiceProvider {
|
||||
return_address: Recipient,
|
||||
controller_sender: ControllerSender,
|
||||
mix_input_sender: MixProxySender<(Socks5Message, Recipient)>,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
let mut conn = match Connection::new(conn_id, remote_addr.clone(), return_address).await {
|
||||
@@ -196,7 +239,7 @@ impl ServiceProvider {
|
||||
);
|
||||
|
||||
// run the proxy on the connection
|
||||
conn.run_proxy(mix_receiver, mix_input_sender, shutdown)
|
||||
conn.run_proxy(mix_receiver, mix_input_sender, lane_queue_lengths, shutdown)
|
||||
.await;
|
||||
|
||||
// proxy is done - remove the access channel from the controller
|
||||
@@ -212,10 +255,12 @@ impl ServiceProvider {
|
||||
);
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn handle_proxy_connect(
|
||||
&mut self,
|
||||
controller_sender: &mut ControllerSender,
|
||||
mix_input_sender: &MixProxySender<(Socks5Message, Recipient)>,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
conn_id: ConnectionId,
|
||||
remote_addr: String,
|
||||
return_address: Recipient,
|
||||
@@ -250,6 +295,7 @@ impl ServiceProvider {
|
||||
return_address,
|
||||
controller_sender_clone,
|
||||
mix_input_sender_clone,
|
||||
lane_queue_lengths,
|
||||
shutdown,
|
||||
)
|
||||
.await
|
||||
@@ -257,7 +303,6 @@ impl ServiceProvider {
|
||||
}
|
||||
|
||||
fn handle_proxy_send(
|
||||
&self,
|
||||
controller_sender: &mut ControllerSender,
|
||||
conn_id: ConnectionId,
|
||||
data: Vec<u8>,
|
||||
@@ -273,6 +318,7 @@ impl ServiceProvider {
|
||||
raw_request: &[u8],
|
||||
controller_sender: &mut ControllerSender,
|
||||
mix_input_sender: &MixProxySender<(Socks5Message, Recipient)>,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
stats_collector: Option<ServiceStatisticsCollector>,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
@@ -296,6 +342,7 @@ impl ServiceProvider {
|
||||
self.handle_proxy_connect(
|
||||
controller_sender,
|
||||
mix_input_sender,
|
||||
lane_queue_lengths,
|
||||
req.conn_id,
|
||||
req.remote_addr,
|
||||
req.return_address,
|
||||
@@ -319,7 +366,7 @@ impl ServiceProvider {
|
||||
.processed(remote_addr, data.len() as u32);
|
||||
}
|
||||
}
|
||||
self.handle_proxy_send(controller_sender, conn_id, data, closed)
|
||||
Self::handle_proxy_send(controller_sender, conn_id, data, closed)
|
||||
}
|
||||
},
|
||||
Socks5Message::Response(_) | Socks5Message::NetworkRequesterResponse(_) => {}
|
||||
@@ -346,11 +393,15 @@ impl ServiceProvider {
|
||||
// `ClientRequest`.
|
||||
let (closed_connection_tx, closed_connection_rx) = mpsc::unbounded();
|
||||
|
||||
// Shared queue length data. Published by the `OutQueueController` in the client, and used
|
||||
// primarily to throttle incoming connections
|
||||
let shared_lane_queue_lengths = LaneQueueLengths::new();
|
||||
|
||||
// Controller for managing all active connections.
|
||||
// We provide it with a ShutdownListener since it requires it, even though for the network
|
||||
// requester shutdown signalling is not yet fully implemented.
|
||||
let (mut active_connections_controller, mut controller_sender) =
|
||||
Controller::new(closed_connection_tx, shutdown.subscribe());
|
||||
Controller::new(closed_connection_tx, true, shutdown.subscribe());
|
||||
|
||||
tokio::spawn(async move {
|
||||
active_connections_controller.run().await;
|
||||
@@ -386,12 +437,14 @@ impl ServiceProvider {
|
||||
println!("\nAll systems go. Press CTRL-C to stop the server.");
|
||||
// for each incoming message from the websocket... (which in 99.99% cases is going to be a mix message)
|
||||
loop {
|
||||
let received = match Self::read_websocket_message(&mut websocket_reader).await {
|
||||
Some(msg) => msg,
|
||||
None => {
|
||||
error!("The websocket stream has finished!");
|
||||
return Ok(());
|
||||
}
|
||||
let Some(received) = Self::read_websocket_message(
|
||||
&mut websocket_reader,
|
||||
shared_lane_queue_lengths.clone()
|
||||
)
|
||||
.await
|
||||
else {
|
||||
log::error!("The websocket stream has finished!");
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let raw_message = received.message;
|
||||
@@ -401,6 +454,7 @@ impl ServiceProvider {
|
||||
&raw_message,
|
||||
&mut controller_sender,
|
||||
&mix_input_sender,
|
||||
shared_lane_queue_lengths.clone(),
|
||||
stats_collector.clone(),
|
||||
shutdown.subscribe(),
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user