Compare commits

...

5 Commits

Author SHA1 Message Date
Jon Häggblad a9428d4f7b wip 2023-02-06 17:32:40 +01:00
Jon Häggblad 067ee0db74 wip 2023-02-06 17:18:33 +01:00
Jon Häggblad e85974d9a5 wip 2023-02-06 15:01:34 +01:00
Jon Häggblad 000100852b wip 2023-02-06 10:58:27 +01:00
Jon Häggblad f66110eaa9 minimal poc 2023-02-05 17:10:04 +01:00
14 changed files with 94 additions and 268 deletions
Generated
+1
View File
@@ -3743,6 +3743,7 @@ dependencies = [
"log",
"logging",
"network-defaults",
"nym-sdk",
"nymsphinx",
"ordered-buffer",
"pretty_env_logger",
@@ -51,11 +51,22 @@ pub mod non_wasm_helpers;
pub mod helpers;
#[derive(Clone)]
pub struct ClientInput {
pub connection_command_sender: ConnectionCommandSender,
pub input_sender: InputMessageSender,
}
impl ClientInput {
pub async fn send(
&self,
message: InputMessage,
) -> Result<(), tokio::sync::mpsc::error::SendError<InputMessage>> {
self.input_sender.send(message).await
}
}
#[derive(Clone)]
pub struct ClientOutput {
pub received_buffer_request_sender: ReceivedBufferRequestSender,
}
@@ -342,7 +342,6 @@ where
if let Poll::Ready(Some(id)) = Pin::new(&mut self.client_connection_rx).poll_next(cx) {
match id {
ConnectionCommand::Close(id) => self.on_close_connection(id),
ConnectionCommand::ActiveConnections(_) => panic!(),
}
}
@@ -421,7 +420,6 @@ where
if let Poll::Ready(Some(id)) = Pin::new(&mut self.client_connection_rx).poll_next(cx) {
match id {
ConnectionCommand::Close(id) => self.on_close_connection(id),
ConnectionCommand::ActiveConnections(_) => panic!(),
}
}
+2 -2
View File
@@ -10,7 +10,7 @@ use client_core::client::{
};
use log::*;
use nymsphinx::addressing::clients::Recipient;
use proxy_helpers::connection_controller::{BroadcastActiveConnections, Controller};
use proxy_helpers::connection_controller::Controller;
use std::net::SocketAddr;
use tap::TapFallible;
use task::TaskClient;
@@ -69,7 +69,7 @@ impl SphinxSocksServer {
// controller for managing all active connections
let (mut active_streams_controller, controller_sender) = Controller::new(
client_connection_tx,
BroadcastActiveConnections::Off,
//BroadcastActiveConnections::Off,
self.shutdown.clone(),
);
tokio::spawn(async move {
-6
View File
@@ -25,12 +25,6 @@ pub enum ConnectionCommand {
// Announce that at a connection was closed. E.g the `OutQueueControl` uses this to discard
// transmission lanes.
Close(ConnectionId),
// In the network requester for example, we usually want to broadcast active connections
// regularly, so we know what connections we need to request lane queue lengths for from the
// client.
// In the socks5-client, this is not needed since have direct access to the lane queue lengths.
ActiveConnections(Vec<ConnectionId>),
}
// The `OutQueueControl` publishes the backlog per lane, primarily so that upstream can slow down
@@ -7,12 +7,8 @@ use futures::StreamExt;
use log::*;
use ordered_buffer::{OrderedMessage, OrderedMessageBuffer, ReadContiguousData};
use socks5_requests::ConnectionId;
use std::{
collections::{HashMap, HashSet},
time::Duration,
};
use std::collections::{HashMap, HashSet};
use task::TaskClient;
use tokio::time;
/// A generic message produced after reading from a socket/connection. It includes data that was
/// actually read alongside boolean indicating whether the connection got closed so that
@@ -87,10 +83,6 @@ pub struct Controller {
// Broadcast closed connections
client_connection_tx: ConnectionCommandSender,
// The controller can broadcast active connections. This is useful in the network-requester
// where its used to query the client for lane queue lengths
broadcast_connections: BroadcastActiveConnections,
// TODO: this can potentially be abused to ddos and kill provider. Not sure at this point
// how to handle it more gracefully
@@ -104,7 +96,6 @@ pub struct Controller {
impl Controller {
pub fn new(
client_connection_tx: ConnectionCommandSender,
broadcast_connections: BroadcastActiveConnections,
shutdown: TaskClient,
) -> (Self, ControllerSender) {
let (sender, receiver) = mpsc::unbounded();
@@ -114,7 +105,6 @@ impl Controller {
receiver,
recently_closed: HashSet::new(),
client_connection_tx,
broadcast_connections,
pending_messages: HashMap::new(),
shutdown,
},
@@ -165,15 +155,6 @@ impl Controller {
}
}
fn broadcast_active_connections(&mut self) {
// What about the recently closed ones? Hopefully we can ignore them ...
let conn_ids = self.active_connections.keys().copied().collect();
self.client_connection_tx
.unbounded_send(ConnectionCommand::ActiveConnections(conn_ids))
.unwrap();
}
fn send_to_connection(&mut self, conn_id: ConnectionId, payload: Vec<u8>, is_closed: bool) {
if let Some(active_connection) = self.active_connections.get_mut(&conn_id) {
if !payload.is_empty() {
@@ -230,8 +211,6 @@ impl Controller {
}
pub async fn run(&mut self) {
let mut interval = time::interval(Duration::from_millis(500));
loop {
tokio::select! {
command = self.receiver.next() => match command {
@@ -247,11 +226,6 @@ impl Controller {
break;
}
},
_ = interval.tick() => {
if self.broadcast_connections == BroadcastActiveConnections::On {
self.broadcast_active_connections();
}
},
}
}
self.shutdown.recv_timeout().await;
+2 -2
View File
@@ -36,8 +36,8 @@ mod connection_state;
mod keys;
mod paths;
pub use client::{MixnetClient, MixnetClientBuilder};
pub use client_core::config::GatewayEndpointConfig;
pub use client::{MixnetClient, MixnetClientBuilder, MixnetClientSender};
pub use client_core::{client::inbound_messages::InputMessage, config::GatewayEndpointConfig};
pub use config::Config;
pub use keys::{Keys, KeysArc};
pub use nymsphinx::{
+36 -9
View File
@@ -268,7 +268,6 @@ pub struct MixnetClient {
/// The current state of the client that is exposed to the user. This includes things like
/// current message send queue length.
#[allow(dead_code)]
client_state: ClientState,
/// A channel for messages arriving from the mixnet after they have been reconstructed.
@@ -421,6 +420,23 @@ impl MixnetClient {
&self.nym_address
}
/// Get a shallow clone of [`MixnetClientSender`]
pub fn sender(&self) -> MixnetClientSender {
MixnetClientSender {
client_input: self.client_input.clone(),
}
}
/// Get a shallow clone of [`ConnectionCommandSender`].
pub fn connection_command_sender(&self) -> client_connections::ConnectionCommandSender {
self.client_input.connection_command_sender.clone()
}
/// Get a shallow clone of [`LaneQueueLengths`].
pub fn shared_lane_queue_lengths(&self) -> client_connections::LaneQueueLengths {
self.client_state.shared_lane_queue_lengths.clone()
}
/// Sends stringy data to the supplied Nym address
pub async fn send_str(&self, address: Recipient, message: &str) {
let message_bytes = message.to_string().into_bytes();
@@ -430,7 +446,7 @@ impl MixnetClient {
/// Sends stringy data to the supplied Nym address, and skip sending reply-SURBs
pub async fn send_str_direct(&self, address: Recipient, message: &str) {
let message_bytes = message.to_string().into_bytes();
self.send_bytes(address, message_bytes).await;
self.send_bytes_direct(address, message_bytes).await;
}
/// Sends bytes to the supplied Nym address
@@ -451,13 +467,12 @@ impl MixnetClient {
pub async fn send_bytes(&self, address: Recipient, message: Vec<u8>) {
let lane = TransmissionLane::General;
let input_msg = InputMessage::new_anonymous(address, message, 20, lane);
if self
.client_input
.input_sender
.send(input_msg)
.await
.is_err()
{
self.send_input_message(input_msg).await
}
/// Sends a [`InputMessage`] to the mixnet.
async fn send_input_message(&self, message: InputMessage) {
if self.client_input.send(message).await.is_err() {
log::error!("Failed to send message");
}
}
@@ -501,3 +516,15 @@ impl MixnetClient {
self.task_manager.wait_for_shutdown().await;
}
}
pub struct MixnetClientSender {
client_input: ClientInput,
}
impl MixnetClientSender {
pub async fn send_input_message(&mut self, message: InputMessage) {
if self.client_input.send(message).await.is_err() {
log::error!("Failed to send message");
}
}
}
@@ -31,9 +31,10 @@ tokio-tungstenite = "0.17.2"
# internal
client-connections = { path = "../../common/client-connections" }
completions = { path = "../../common/completions" }
network-defaults = { path = "../../common/network-defaults" }
nymsphinx = { path = "../../common/nymsphinx" }
logging = { path = "../../common/logging"}
network-defaults = { path = "../../common/network-defaults" }
nym-sdk = { path = "../../sdk/rust/nym-sdk" }
nymsphinx = { path = "../../common/nymsphinx" }
ordered-buffer = {path = "../../common/socks5/ordered-buffer"}
proxy-helpers = { path = "../../common/socks5/proxy-helpers" }
socks5-requests = { path = "../../common/socks5/requests" }
+28 -158
View File
@@ -4,22 +4,16 @@ use crate::allowed_hosts;
use crate::allowed_hosts::OutboundRequestFilter;
use crate::error::NetworkRequesterError;
use crate::statistics::ServiceStatisticsCollector;
use crate::websocket;
use crate::websocket::TSWebsocketStream;
use crate::{reply, socks5};
use client_connections::{
ConnectionCommand, ConnectionCommandReceiver, LaneQueueLengths, TransmissionLane,
};
use client_connections::LaneQueueLengths;
use futures::channel::mpsc;
use futures::stream::{SplitSink, SplitStream};
use futures::{SinkExt, StreamExt};
use nymsphinx::addressing::clients::Recipient;
use nymsphinx::anonymous_replies::requests::AnonymousSenderTag;
use nymsphinx::receiver::ReconstructedMessage;
use proxy_helpers::connection_controller::{
BroadcastActiveConnections, Controller, ControllerCommand, ControllerSender,
use proxy_helpers::{
connection_controller::{Controller, ControllerCommand, ControllerSender},
proxy_runner::{MixProxyReader, MixProxySender},
};
use proxy_helpers::proxy_runner::{MixProxyReader, MixProxySender};
use socks5_requests::{
ConnectRequest, ConnectionId, Message as Socks5Message, NetworkRequesterResponse, Request,
Response,
@@ -28,14 +22,11 @@ use statistics_common::collector::StatisticsSender;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use task::TaskClient;
use tokio_tungstenite::tungstenite::protocol::Message;
use websocket_requests::{requests::ClientRequest, responses::ServerResponse};
// Since it's an atomic, it's safe to be kept static and shared across threads
static ACTIVE_PROXIES: AtomicUsize = AtomicUsize::new(0);
pub struct ServiceProvider {
websocket_address: String,
outbound_request_filter: OutboundRequestFilter,
open_proxy: bool,
enable_statistics: bool,
@@ -44,7 +35,6 @@ pub struct ServiceProvider {
impl ServiceProvider {
pub async fn new(
websocket_address: String,
open_proxy: bool,
enable_statistics: bool,
stats_provider_addr: Option<Recipient>,
@@ -67,7 +57,6 @@ impl ServiceProvider {
let outbound_request_filter = OutboundRequestFilter::new(allowed_hosts, unknown_hosts);
ServiceProvider {
websocket_address,
outbound_request_filter,
open_proxy,
enable_statistics,
@@ -78,10 +67,9 @@ impl ServiceProvider {
/// Listens for any messages from `mix_reader` that should be written back to the mix network
/// via the `websocket_writer`.
async fn mixnet_response_listener(
mut websocket_writer: SplitSink<TSWebsocketStream, Message>,
mut mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender,
mut mix_reader: MixProxyReader<(Socks5Message, reply::ReturnAddress)>,
stats_collector: Option<ServiceStatisticsCollector>,
mut client_connection_rx: ConnectionCommandReceiver,
) {
loop {
tokio::select! {
@@ -102,102 +90,19 @@ impl ServiceProvider {
}
}
// make 'request' to native-websocket client
let conn_id = msg.conn_id();
let response_message = return_address.send_back_to(msg.into_bytes(), conn_id);
let message = Message::Binary(response_message.serialize());
websocket_writer.send(message).await.unwrap();
mixnet_client_sender.send_input_message(response_message).await;
} else {
log::error!("Exiting: channel closed!");
break;
}
},
Some(command) = client_connection_rx.next() => {
match command {
ConnectionCommand::Close(id) => {
let msg = ClientRequest::ClosedConnection(id);
let ws_msg = Message::Binary(msg.serialize());
websocket_writer.send(ws_msg).await.unwrap();
}
ConnectionCommand::ActiveConnections(ids) => {
// We can optimize this by sending a single request, but this is
// usually in the low single digits, max a few tens, so we leave that
// for a rainy day.
// Also that means fiddling with the currently manual
// serialize/deserialize we do with ClientRequests ...
for id in ids {
log::trace!("Requesting lane queue length for: {}", id);
let msg = ClientRequest::GetLaneQueueLength(id);
let ws_msg = Message::Binary(msg.serialize());
websocket_writer.send(ws_msg).await.unwrap();
}
}
}
},
}
}
}
fn handle_lane_queue_length_response(
lane_queue_lengths: &LaneQueueLengths,
lane: u64,
queue_length: usize,
) {
log::trace!("Received LaneQueueLength lane: {lane}, queue_length: {queue_length}");
if let Ok(mut lane_queue_lengths) = lane_queue_lengths.lock() {
let lane = TransmissionLane::ConnectionId(lane);
lane_queue_lengths.map.insert(lane, queue_length);
} else {
log::warn!("Unable to lock lane queue lengths, skipping updating received lane length")
}
}
async fn read_websocket_message(
websocket_reader: &mut SplitStream<TSWebsocketStream>,
lane_queue_lengths: LaneQueueLengths,
) -> Option<ReconstructedMessage> {
while let Some(msg) = websocket_reader.next().await {
let data = match msg {
Ok(msg) => msg.into_data(),
Err(err) => {
log::error!("Failed to read from the websocket: {err}");
continue;
}
};
// try to recover the actual message from the mix network...
let deserialized_message = match ServerResponse::deserialize(&data) {
Ok(deserialized) => deserialized,
Err(err) => {
log::error!(
"Failed to deserialize received websocket message! - {}",
err
);
continue;
}
};
let received = match deserialized_message {
ServerResponse::Received(received) => received,
ServerResponse::LaneQueueLength { lane, queue_length } => {
Self::handle_lane_queue_length_response(
&lane_queue_lengths,
lane,
queue_length,
);
continue;
}
ServerResponse::Error(err) => {
panic!("received error from native client! - {err}")
}
_ => unimplemented!("probably should never be reached?"),
};
return Some(received);
}
None
}
async fn start_proxy(
conn_id: ConnectionId,
remote_addr: String,
@@ -395,10 +300,8 @@ impl ServiceProvider {
/// Start all subsystems
pub async fn run(&mut self) -> Result<(), NetworkRequesterError> {
let websocket_stream = self.connect_websocket(&self.websocket_address).await?;
// split the websocket so that we could read and write from separate threads
let (websocket_writer, mut websocket_reader) = websocket_stream.split();
// Connect to the mixnet
let mut mixnet_client = nym_sdk::mixnet::MixnetClient::connect().await.unwrap();
// channels responsible for managing messages that are to be sent to the mix network. The receiver is
// going to be used by `mixnet_response_listener`
@@ -408,21 +311,9 @@ impl ServiceProvider {
// Used to notify tasks to shutdown. Not all tasks fully supports this (yet).
let shutdown = task::TaskManager::default();
// Channel for announcing client connection state by the controller.
// The `mixnet_response_listener` will use this to either report closed connection to the
// client or request lane queue lengths.
let (client_connection_tx, client_connection_rx) = mpsc::unbounded();
// Shared queue length data. Published by the `OutQueueController` in the client, and used
// primarily to throttle incoming connections
let shared_lane_queue_lengths = LaneQueueLengths::new();
// Controller for managing all active connections.
// We provide it with a ShutdownListener since it requires it, even though for the network
// requester shutdown signalling is not yet fully implemented.
let (mut active_connections_controller, mut controller_sender) = Controller::new(
client_connection_tx,
BroadcastActiveConnections::On,
mixnet_client.connection_command_sender(),
shutdown.subscribe(),
);
@@ -446,58 +337,37 @@ impl ServiceProvider {
};
let stats_collector_clone = stats_collector.clone();
let mixnet_client_sender = mixnet_client.sender();
// start the listener for mix messages
tokio::spawn(async move {
Self::mixnet_response_listener(
websocket_writer,
mixnet_client_sender,
mix_input_receiver,
stats_collector_clone,
client_connection_rx,
)
.await;
});
let nym_address = mixnet_client.nym_address();
log::info!("Our nym address is: {nym_address}");
log::info!("All systems go. Press CTRL-C to stop the server.");
// for each incoming message from the websocket... (which in 99.99% cases is going to be a mix message)
loop {
let Some(received) = Self::read_websocket_message(
&mut websocket_reader,
shared_lane_queue_lengths.clone()
while let Some(received) = mixnet_client.wait_for_messages().await {
for received in received {
self.handle_proxy_message(
received,
&mut controller_sender,
&mix_input_sender,
mixnet_client.shared_lane_queue_lengths(),
stats_collector.clone(),
shutdown.subscribe(),
)
.await
else {
log::error!("The websocket stream has finished!");
return Err(NetworkRequesterError::ConnectionClosed);
};
self.handle_proxy_message(
received,
&mut controller_sender,
&mix_input_sender,
shared_lane_queue_lengths.clone(),
stats_collector.clone(),
shutdown.subscribe(),
)
.await;
}
}
// Make the websocket connection so we can receive incoming Mixnet messages.
async fn connect_websocket(
&self,
uri: &str,
) -> Result<TSWebsocketStream, NetworkRequesterError> {
match websocket::Connection::new(uri).connect().await {
Ok(ws_stream) => {
log::info!("* connected to local websocket server at {}", uri);
Ok(ws_stream)
}
Err(err) => {
log::error!(
"Error: websocket connection attempt failed, is the Nym client running?"
);
Err(err.into())
.await;
}
}
log::error!("Network requester exited unexpectedly");
Ok(())
}
}
@@ -1,13 +1,5 @@
use crate::websocket::WebsocketConnectionError;
#[derive(thiserror::Error, Debug)]
pub enum NetworkRequesterError {
#[error("I/O error: {0}")]
IoError(#[from] std::io::Error),
#[error("Websocket error")]
WebsocketConnectionError(#[from] WebsocketConnectionError),
#[error("Websocket connection closed")]
ConnectionClosed,
}
@@ -16,7 +16,6 @@ mod error;
mod reply;
mod socks5;
mod statistics;
mod websocket;
const ENABLE_STATISTICS: &str = "enable-statistics";
@@ -56,16 +55,8 @@ impl Run {
.transpose()
.unwrap_or(None);
let websocket_address = format!(
"ws://localhost:{}",
self.websocket_port
.as_ref()
.unwrap_or(&network_defaults::DEFAULT_WEBSOCKET_LISTENING_PORT.to_string())
);
log::info!("Starting socks5 service provider");
let mut server = core::ServiceProvider::new(
websocket_address,
self.open_proxy,
self.enable_statistics,
stats_provider_addr,
@@ -1,6 +1,7 @@
use client_connections::TransmissionLane;
use nym_sdk::mixnet::InputMessage;
use nymsphinx::addressing::clients::Recipient;
use nymsphinx::anonymous_replies::requests::AnonymousSenderTag;
use websocket_requests::requests::ClientRequest;
/// A return address is a way to send a message back to the original sender. It can be either
/// an explicitly known Recipient, or a surb AnonymousSenderTag.
@@ -24,17 +25,17 @@ impl ReturnAddress {
None
}
pub(super) fn send_back_to(self, message: Vec<u8>, connection_id: u64) -> ClientRequest {
pub(super) fn send_back_to(self, message: Vec<u8>, connection_id: u64) -> InputMessage {
match self {
ReturnAddress::Known(recipient) => ClientRequest::Send {
ReturnAddress::Known(recipient) => InputMessage::Regular {
recipient: *recipient,
message,
connection_id: Some(connection_id),
data: message,
lane: TransmissionLane::ConnectionId(connection_id),
},
ReturnAddress::Anonymous(sender_tag) => ClientRequest::Reply {
message,
sender_tag,
connection_id: Some(connection_id),
ReturnAddress::Anonymous(sender_tag) => InputMessage::Reply {
recipient_tag: sender_tag,
data: message,
lane: TransmissionLane::ConnectionId(connection_id),
},
}
}
@@ -1,34 +0,0 @@
// Copyright 2020 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use tokio::net::TcpStream;
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::{connect_async, MaybeTlsStream};
#[allow(clippy::upper_case_acronyms)]
pub(crate) type TSWebsocketStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
pub struct Connection {
uri: String,
}
impl Connection {
pub fn new(uri: &str) -> Connection {
Connection {
uri: String::from(uri),
}
}
pub async fn connect(&self) -> Result<TSWebsocketStream, WebsocketConnectionError> {
match connect_async(&self.uri).await {
Ok((ws_stream, _)) => Ok(ws_stream),
Err(e) => Err(WebsocketConnectionError::ConnectionNotEstablished(e)),
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum WebsocketConnectionError {
#[error("Connection not established")]
ConnectionNotEstablished(tokio_tungstenite::tungstenite::Error),
}