Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a9428d4f7b | |||
| 067ee0db74 | |||
| e85974d9a5 | |||
| 000100852b | |||
| f66110eaa9 |
Generated
+1
@@ -3743,6 +3743,7 @@ dependencies = [
|
||||
"log",
|
||||
"logging",
|
||||
"network-defaults",
|
||||
"nym-sdk",
|
||||
"nymsphinx",
|
||||
"ordered-buffer",
|
||||
"pretty_env_logger",
|
||||
|
||||
@@ -51,11 +51,22 @@ pub mod non_wasm_helpers;
|
||||
|
||||
pub mod helpers;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ClientInput {
|
||||
pub connection_command_sender: ConnectionCommandSender,
|
||||
pub input_sender: InputMessageSender,
|
||||
}
|
||||
|
||||
impl ClientInput {
|
||||
pub async fn send(
|
||||
&self,
|
||||
message: InputMessage,
|
||||
) -> Result<(), tokio::sync::mpsc::error::SendError<InputMessage>> {
|
||||
self.input_sender.send(message).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ClientOutput {
|
||||
pub received_buffer_request_sender: ReceivedBufferRequestSender,
|
||||
}
|
||||
|
||||
@@ -342,7 +342,6 @@ where
|
||||
if let Poll::Ready(Some(id)) = Pin::new(&mut self.client_connection_rx).poll_next(cx) {
|
||||
match id {
|
||||
ConnectionCommand::Close(id) => self.on_close_connection(id),
|
||||
ConnectionCommand::ActiveConnections(_) => panic!(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -421,7 +420,6 @@ where
|
||||
if let Poll::Ready(Some(id)) = Pin::new(&mut self.client_connection_rx).poll_next(cx) {
|
||||
match id {
|
||||
ConnectionCommand::Close(id) => self.on_close_connection(id),
|
||||
ConnectionCommand::ActiveConnections(_) => panic!(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ use client_core::client::{
|
||||
};
|
||||
use log::*;
|
||||
use nymsphinx::addressing::clients::Recipient;
|
||||
use proxy_helpers::connection_controller::{BroadcastActiveConnections, Controller};
|
||||
use proxy_helpers::connection_controller::Controller;
|
||||
use std::net::SocketAddr;
|
||||
use tap::TapFallible;
|
||||
use task::TaskClient;
|
||||
@@ -69,7 +69,7 @@ impl SphinxSocksServer {
|
||||
// controller for managing all active connections
|
||||
let (mut active_streams_controller, controller_sender) = Controller::new(
|
||||
client_connection_tx,
|
||||
BroadcastActiveConnections::Off,
|
||||
//BroadcastActiveConnections::Off,
|
||||
self.shutdown.clone(),
|
||||
);
|
||||
tokio::spawn(async move {
|
||||
|
||||
@@ -25,12 +25,6 @@ pub enum ConnectionCommand {
|
||||
// Announce that at a connection was closed. E.g the `OutQueueControl` uses this to discard
|
||||
// transmission lanes.
|
||||
Close(ConnectionId),
|
||||
|
||||
// In the network requester for example, we usually want to broadcast active connections
|
||||
// regularly, so we know what connections we need to request lane queue lengths for from the
|
||||
// client.
|
||||
// In the socks5-client, this is not needed since have direct access to the lane queue lengths.
|
||||
ActiveConnections(Vec<ConnectionId>),
|
||||
}
|
||||
|
||||
// The `OutQueueControl` publishes the backlog per lane, primarily so that upstream can slow down
|
||||
|
||||
@@ -7,12 +7,8 @@ use futures::StreamExt;
|
||||
use log::*;
|
||||
use ordered_buffer::{OrderedMessage, OrderedMessageBuffer, ReadContiguousData};
|
||||
use socks5_requests::ConnectionId;
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
time::Duration,
|
||||
};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use task::TaskClient;
|
||||
use tokio::time;
|
||||
|
||||
/// A generic message produced after reading from a socket/connection. It includes data that was
|
||||
/// actually read alongside boolean indicating whether the connection got closed so that
|
||||
@@ -87,10 +83,6 @@ pub struct Controller {
|
||||
// Broadcast closed connections
|
||||
client_connection_tx: ConnectionCommandSender,
|
||||
|
||||
// The controller can broadcast active connections. This is useful in the network-requester
|
||||
// where its used to query the client for lane queue lengths
|
||||
broadcast_connections: BroadcastActiveConnections,
|
||||
|
||||
// TODO: this can potentially be abused to ddos and kill provider. Not sure at this point
|
||||
// how to handle it more gracefully
|
||||
|
||||
@@ -104,7 +96,6 @@ pub struct Controller {
|
||||
impl Controller {
|
||||
pub fn new(
|
||||
client_connection_tx: ConnectionCommandSender,
|
||||
broadcast_connections: BroadcastActiveConnections,
|
||||
shutdown: TaskClient,
|
||||
) -> (Self, ControllerSender) {
|
||||
let (sender, receiver) = mpsc::unbounded();
|
||||
@@ -114,7 +105,6 @@ impl Controller {
|
||||
receiver,
|
||||
recently_closed: HashSet::new(),
|
||||
client_connection_tx,
|
||||
broadcast_connections,
|
||||
pending_messages: HashMap::new(),
|
||||
shutdown,
|
||||
},
|
||||
@@ -165,15 +155,6 @@ impl Controller {
|
||||
}
|
||||
}
|
||||
|
||||
fn broadcast_active_connections(&mut self) {
|
||||
// What about the recently closed ones? Hopefully we can ignore them ...
|
||||
let conn_ids = self.active_connections.keys().copied().collect();
|
||||
|
||||
self.client_connection_tx
|
||||
.unbounded_send(ConnectionCommand::ActiveConnections(conn_ids))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn send_to_connection(&mut self, conn_id: ConnectionId, payload: Vec<u8>, is_closed: bool) {
|
||||
if let Some(active_connection) = self.active_connections.get_mut(&conn_id) {
|
||||
if !payload.is_empty() {
|
||||
@@ -230,8 +211,6 @@ impl Controller {
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) {
|
||||
let mut interval = time::interval(Duration::from_millis(500));
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
command = self.receiver.next() => match command {
|
||||
@@ -247,11 +226,6 @@ impl Controller {
|
||||
break;
|
||||
}
|
||||
},
|
||||
_ = interval.tick() => {
|
||||
if self.broadcast_connections == BroadcastActiveConnections::On {
|
||||
self.broadcast_active_connections();
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
self.shutdown.recv_timeout().await;
|
||||
|
||||
@@ -36,8 +36,8 @@ mod connection_state;
|
||||
mod keys;
|
||||
mod paths;
|
||||
|
||||
pub use client::{MixnetClient, MixnetClientBuilder};
|
||||
pub use client_core::config::GatewayEndpointConfig;
|
||||
pub use client::{MixnetClient, MixnetClientBuilder, MixnetClientSender};
|
||||
pub use client_core::{client::inbound_messages::InputMessage, config::GatewayEndpointConfig};
|
||||
pub use config::Config;
|
||||
pub use keys::{Keys, KeysArc};
|
||||
pub use nymsphinx::{
|
||||
|
||||
@@ -268,7 +268,6 @@ pub struct MixnetClient {
|
||||
|
||||
/// The current state of the client that is exposed to the user. This includes things like
|
||||
/// current message send queue length.
|
||||
#[allow(dead_code)]
|
||||
client_state: ClientState,
|
||||
|
||||
/// A channel for messages arriving from the mixnet after they have been reconstructed.
|
||||
@@ -421,6 +420,23 @@ impl MixnetClient {
|
||||
&self.nym_address
|
||||
}
|
||||
|
||||
/// Get a shallow clone of [`MixnetClientSender`]
|
||||
pub fn sender(&self) -> MixnetClientSender {
|
||||
MixnetClientSender {
|
||||
client_input: self.client_input.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a shallow clone of [`ConnectionCommandSender`].
|
||||
pub fn connection_command_sender(&self) -> client_connections::ConnectionCommandSender {
|
||||
self.client_input.connection_command_sender.clone()
|
||||
}
|
||||
|
||||
/// Get a shallow clone of [`LaneQueueLengths`].
|
||||
pub fn shared_lane_queue_lengths(&self) -> client_connections::LaneQueueLengths {
|
||||
self.client_state.shared_lane_queue_lengths.clone()
|
||||
}
|
||||
|
||||
/// Sends stringy data to the supplied Nym address
|
||||
pub async fn send_str(&self, address: Recipient, message: &str) {
|
||||
let message_bytes = message.to_string().into_bytes();
|
||||
@@ -430,7 +446,7 @@ impl MixnetClient {
|
||||
/// Sends stringy data to the supplied Nym address, and skip sending reply-SURBs
|
||||
pub async fn send_str_direct(&self, address: Recipient, message: &str) {
|
||||
let message_bytes = message.to_string().into_bytes();
|
||||
self.send_bytes(address, message_bytes).await;
|
||||
self.send_bytes_direct(address, message_bytes).await;
|
||||
}
|
||||
|
||||
/// Sends bytes to the supplied Nym address
|
||||
@@ -451,13 +467,12 @@ impl MixnetClient {
|
||||
pub async fn send_bytes(&self, address: Recipient, message: Vec<u8>) {
|
||||
let lane = TransmissionLane::General;
|
||||
let input_msg = InputMessage::new_anonymous(address, message, 20, lane);
|
||||
if self
|
||||
.client_input
|
||||
.input_sender
|
||||
.send(input_msg)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
self.send_input_message(input_msg).await
|
||||
}
|
||||
|
||||
/// Sends a [`InputMessage`] to the mixnet.
|
||||
async fn send_input_message(&self, message: InputMessage) {
|
||||
if self.client_input.send(message).await.is_err() {
|
||||
log::error!("Failed to send message");
|
||||
}
|
||||
}
|
||||
@@ -501,3 +516,15 @@ impl MixnetClient {
|
||||
self.task_manager.wait_for_shutdown().await;
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MixnetClientSender {
|
||||
client_input: ClientInput,
|
||||
}
|
||||
|
||||
impl MixnetClientSender {
|
||||
pub async fn send_input_message(&mut self, message: InputMessage) {
|
||||
if self.client_input.send(message).await.is_err() {
|
||||
log::error!("Failed to send message");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,9 +31,10 @@ tokio-tungstenite = "0.17.2"
|
||||
# internal
|
||||
client-connections = { path = "../../common/client-connections" }
|
||||
completions = { path = "../../common/completions" }
|
||||
network-defaults = { path = "../../common/network-defaults" }
|
||||
nymsphinx = { path = "../../common/nymsphinx" }
|
||||
logging = { path = "../../common/logging"}
|
||||
network-defaults = { path = "../../common/network-defaults" }
|
||||
nym-sdk = { path = "../../sdk/rust/nym-sdk" }
|
||||
nymsphinx = { path = "../../common/nymsphinx" }
|
||||
ordered-buffer = {path = "../../common/socks5/ordered-buffer"}
|
||||
proxy-helpers = { path = "../../common/socks5/proxy-helpers" }
|
||||
socks5-requests = { path = "../../common/socks5/requests" }
|
||||
|
||||
@@ -4,22 +4,16 @@ use crate::allowed_hosts;
|
||||
use crate::allowed_hosts::OutboundRequestFilter;
|
||||
use crate::error::NetworkRequesterError;
|
||||
use crate::statistics::ServiceStatisticsCollector;
|
||||
use crate::websocket;
|
||||
use crate::websocket::TSWebsocketStream;
|
||||
use crate::{reply, socks5};
|
||||
use client_connections::{
|
||||
ConnectionCommand, ConnectionCommandReceiver, LaneQueueLengths, TransmissionLane,
|
||||
};
|
||||
use client_connections::LaneQueueLengths;
|
||||
use futures::channel::mpsc;
|
||||
use futures::stream::{SplitSink, SplitStream};
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use nymsphinx::addressing::clients::Recipient;
|
||||
use nymsphinx::anonymous_replies::requests::AnonymousSenderTag;
|
||||
use nymsphinx::receiver::ReconstructedMessage;
|
||||
use proxy_helpers::connection_controller::{
|
||||
BroadcastActiveConnections, Controller, ControllerCommand, ControllerSender,
|
||||
use proxy_helpers::{
|
||||
connection_controller::{Controller, ControllerCommand, ControllerSender},
|
||||
proxy_runner::{MixProxyReader, MixProxySender},
|
||||
};
|
||||
use proxy_helpers::proxy_runner::{MixProxyReader, MixProxySender};
|
||||
use socks5_requests::{
|
||||
ConnectRequest, ConnectionId, Message as Socks5Message, NetworkRequesterResponse, Request,
|
||||
Response,
|
||||
@@ -28,14 +22,11 @@ use statistics_common::collector::StatisticsSender;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use task::TaskClient;
|
||||
use tokio_tungstenite::tungstenite::protocol::Message;
|
||||
use websocket_requests::{requests::ClientRequest, responses::ServerResponse};
|
||||
|
||||
// Since it's an atomic, it's safe to be kept static and shared across threads
|
||||
static ACTIVE_PROXIES: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
pub struct ServiceProvider {
|
||||
websocket_address: String,
|
||||
outbound_request_filter: OutboundRequestFilter,
|
||||
open_proxy: bool,
|
||||
enable_statistics: bool,
|
||||
@@ -44,7 +35,6 @@ pub struct ServiceProvider {
|
||||
|
||||
impl ServiceProvider {
|
||||
pub async fn new(
|
||||
websocket_address: String,
|
||||
open_proxy: bool,
|
||||
enable_statistics: bool,
|
||||
stats_provider_addr: Option<Recipient>,
|
||||
@@ -67,7 +57,6 @@ impl ServiceProvider {
|
||||
|
||||
let outbound_request_filter = OutboundRequestFilter::new(allowed_hosts, unknown_hosts);
|
||||
ServiceProvider {
|
||||
websocket_address,
|
||||
outbound_request_filter,
|
||||
open_proxy,
|
||||
enable_statistics,
|
||||
@@ -78,10 +67,9 @@ impl ServiceProvider {
|
||||
/// Listens for any messages from `mix_reader` that should be written back to the mix network
|
||||
/// via the `websocket_writer`.
|
||||
async fn mixnet_response_listener(
|
||||
mut websocket_writer: SplitSink<TSWebsocketStream, Message>,
|
||||
mut mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender,
|
||||
mut mix_reader: MixProxyReader<(Socks5Message, reply::ReturnAddress)>,
|
||||
stats_collector: Option<ServiceStatisticsCollector>,
|
||||
mut client_connection_rx: ConnectionCommandReceiver,
|
||||
) {
|
||||
loop {
|
||||
tokio::select! {
|
||||
@@ -102,102 +90,19 @@ impl ServiceProvider {
|
||||
}
|
||||
}
|
||||
|
||||
// make 'request' to native-websocket client
|
||||
let conn_id = msg.conn_id();
|
||||
let response_message = return_address.send_back_to(msg.into_bytes(), conn_id);
|
||||
|
||||
let message = Message::Binary(response_message.serialize());
|
||||
websocket_writer.send(message).await.unwrap();
|
||||
mixnet_client_sender.send_input_message(response_message).await;
|
||||
} else {
|
||||
log::error!("Exiting: channel closed!");
|
||||
break;
|
||||
}
|
||||
},
|
||||
Some(command) = client_connection_rx.next() => {
|
||||
match command {
|
||||
ConnectionCommand::Close(id) => {
|
||||
let msg = ClientRequest::ClosedConnection(id);
|
||||
let ws_msg = Message::Binary(msg.serialize());
|
||||
websocket_writer.send(ws_msg).await.unwrap();
|
||||
}
|
||||
ConnectionCommand::ActiveConnections(ids) => {
|
||||
// We can optimize this by sending a single request, but this is
|
||||
// usually in the low single digits, max a few tens, so we leave that
|
||||
// for a rainy day.
|
||||
// Also that means fiddling with the currently manual
|
||||
// serialize/deserialize we do with ClientRequests ...
|
||||
for id in ids {
|
||||
log::trace!("Requesting lane queue length for: {}", id);
|
||||
let msg = ClientRequest::GetLaneQueueLength(id);
|
||||
let ws_msg = Message::Binary(msg.serialize());
|
||||
websocket_writer.send(ws_msg).await.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_lane_queue_length_response(
|
||||
lane_queue_lengths: &LaneQueueLengths,
|
||||
lane: u64,
|
||||
queue_length: usize,
|
||||
) {
|
||||
log::trace!("Received LaneQueueLength lane: {lane}, queue_length: {queue_length}");
|
||||
if let Ok(mut lane_queue_lengths) = lane_queue_lengths.lock() {
|
||||
let lane = TransmissionLane::ConnectionId(lane);
|
||||
lane_queue_lengths.map.insert(lane, queue_length);
|
||||
} else {
|
||||
log::warn!("Unable to lock lane queue lengths, skipping updating received lane length")
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_websocket_message(
|
||||
websocket_reader: &mut SplitStream<TSWebsocketStream>,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
) -> Option<ReconstructedMessage> {
|
||||
while let Some(msg) = websocket_reader.next().await {
|
||||
let data = match msg {
|
||||
Ok(msg) => msg.into_data(),
|
||||
Err(err) => {
|
||||
log::error!("Failed to read from the websocket: {err}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// try to recover the actual message from the mix network...
|
||||
let deserialized_message = match ServerResponse::deserialize(&data) {
|
||||
Ok(deserialized) => deserialized,
|
||||
Err(err) => {
|
||||
log::error!(
|
||||
"Failed to deserialize received websocket message! - {}",
|
||||
err
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let received = match deserialized_message {
|
||||
ServerResponse::Received(received) => received,
|
||||
ServerResponse::LaneQueueLength { lane, queue_length } => {
|
||||
Self::handle_lane_queue_length_response(
|
||||
&lane_queue_lengths,
|
||||
lane,
|
||||
queue_length,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
ServerResponse::Error(err) => {
|
||||
panic!("received error from native client! - {err}")
|
||||
}
|
||||
_ => unimplemented!("probably should never be reached?"),
|
||||
};
|
||||
return Some(received);
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
async fn start_proxy(
|
||||
conn_id: ConnectionId,
|
||||
remote_addr: String,
|
||||
@@ -395,10 +300,8 @@ impl ServiceProvider {
|
||||
|
||||
/// Start all subsystems
|
||||
pub async fn run(&mut self) -> Result<(), NetworkRequesterError> {
|
||||
let websocket_stream = self.connect_websocket(&self.websocket_address).await?;
|
||||
|
||||
// split the websocket so that we could read and write from separate threads
|
||||
let (websocket_writer, mut websocket_reader) = websocket_stream.split();
|
||||
// Connect to the mixnet
|
||||
let mut mixnet_client = nym_sdk::mixnet::MixnetClient::connect().await.unwrap();
|
||||
|
||||
// channels responsible for managing messages that are to be sent to the mix network. The receiver is
|
||||
// going to be used by `mixnet_response_listener`
|
||||
@@ -408,21 +311,9 @@ impl ServiceProvider {
|
||||
// Used to notify tasks to shutdown. Not all tasks fully supports this (yet).
|
||||
let shutdown = task::TaskManager::default();
|
||||
|
||||
// Channel for announcing client connection state by the controller.
|
||||
// The `mixnet_response_listener` will use this to either report closed connection to the
|
||||
// client or request lane queue lengths.
|
||||
let (client_connection_tx, client_connection_rx) = mpsc::unbounded();
|
||||
|
||||
// Shared queue length data. Published by the `OutQueueController` in the client, and used
|
||||
// primarily to throttle incoming connections
|
||||
let shared_lane_queue_lengths = LaneQueueLengths::new();
|
||||
|
||||
// Controller for managing all active connections.
|
||||
// We provide it with a ShutdownListener since it requires it, even though for the network
|
||||
// requester shutdown signalling is not yet fully implemented.
|
||||
let (mut active_connections_controller, mut controller_sender) = Controller::new(
|
||||
client_connection_tx,
|
||||
BroadcastActiveConnections::On,
|
||||
mixnet_client.connection_command_sender(),
|
||||
shutdown.subscribe(),
|
||||
);
|
||||
|
||||
@@ -446,58 +337,37 @@ impl ServiceProvider {
|
||||
};
|
||||
|
||||
let stats_collector_clone = stats_collector.clone();
|
||||
let mixnet_client_sender = mixnet_client.sender();
|
||||
|
||||
// start the listener for mix messages
|
||||
tokio::spawn(async move {
|
||||
Self::mixnet_response_listener(
|
||||
websocket_writer,
|
||||
mixnet_client_sender,
|
||||
mix_input_receiver,
|
||||
stats_collector_clone,
|
||||
client_connection_rx,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
|
||||
let nym_address = mixnet_client.nym_address();
|
||||
log::info!("Our nym address is: {nym_address}");
|
||||
log::info!("All systems go. Press CTRL-C to stop the server.");
|
||||
// for each incoming message from the websocket... (which in 99.99% cases is going to be a mix message)
|
||||
loop {
|
||||
let Some(received) = Self::read_websocket_message(
|
||||
&mut websocket_reader,
|
||||
shared_lane_queue_lengths.clone()
|
||||
|
||||
while let Some(received) = mixnet_client.wait_for_messages().await {
|
||||
for received in received {
|
||||
self.handle_proxy_message(
|
||||
received,
|
||||
&mut controller_sender,
|
||||
&mix_input_sender,
|
||||
mixnet_client.shared_lane_queue_lengths(),
|
||||
stats_collector.clone(),
|
||||
shutdown.subscribe(),
|
||||
)
|
||||
.await
|
||||
else {
|
||||
log::error!("The websocket stream has finished!");
|
||||
return Err(NetworkRequesterError::ConnectionClosed);
|
||||
};
|
||||
|
||||
self.handle_proxy_message(
|
||||
received,
|
||||
&mut controller_sender,
|
||||
&mix_input_sender,
|
||||
shared_lane_queue_lengths.clone(),
|
||||
stats_collector.clone(),
|
||||
shutdown.subscribe(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
// Make the websocket connection so we can receive incoming Mixnet messages.
|
||||
async fn connect_websocket(
|
||||
&self,
|
||||
uri: &str,
|
||||
) -> Result<TSWebsocketStream, NetworkRequesterError> {
|
||||
match websocket::Connection::new(uri).connect().await {
|
||||
Ok(ws_stream) => {
|
||||
log::info!("* connected to local websocket server at {}", uri);
|
||||
Ok(ws_stream)
|
||||
}
|
||||
Err(err) => {
|
||||
log::error!(
|
||||
"Error: websocket connection attempt failed, is the Nym client running?"
|
||||
);
|
||||
Err(err.into())
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
log::error!("Network requester exited unexpectedly");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,13 +1,5 @@
|
||||
use crate::websocket::WebsocketConnectionError;
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum NetworkRequesterError {
|
||||
#[error("I/O error: {0}")]
|
||||
IoError(#[from] std::io::Error),
|
||||
|
||||
#[error("Websocket error")]
|
||||
WebsocketConnectionError(#[from] WebsocketConnectionError),
|
||||
|
||||
#[error("Websocket connection closed")]
|
||||
ConnectionClosed,
|
||||
}
|
||||
|
||||
@@ -16,7 +16,6 @@ mod error;
|
||||
mod reply;
|
||||
mod socks5;
|
||||
mod statistics;
|
||||
mod websocket;
|
||||
|
||||
const ENABLE_STATISTICS: &str = "enable-statistics";
|
||||
|
||||
@@ -56,16 +55,8 @@ impl Run {
|
||||
.transpose()
|
||||
.unwrap_or(None);
|
||||
|
||||
let websocket_address = format!(
|
||||
"ws://localhost:{}",
|
||||
self.websocket_port
|
||||
.as_ref()
|
||||
.unwrap_or(&network_defaults::DEFAULT_WEBSOCKET_LISTENING_PORT.to_string())
|
||||
);
|
||||
|
||||
log::info!("Starting socks5 service provider");
|
||||
let mut server = core::ServiceProvider::new(
|
||||
websocket_address,
|
||||
self.open_proxy,
|
||||
self.enable_statistics,
|
||||
stats_provider_addr,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use client_connections::TransmissionLane;
|
||||
use nym_sdk::mixnet::InputMessage;
|
||||
use nymsphinx::addressing::clients::Recipient;
|
||||
use nymsphinx::anonymous_replies::requests::AnonymousSenderTag;
|
||||
use websocket_requests::requests::ClientRequest;
|
||||
|
||||
/// A return address is a way to send a message back to the original sender. It can be either
|
||||
/// an explicitly known Recipient, or a surb AnonymousSenderTag.
|
||||
@@ -24,17 +25,17 @@ impl ReturnAddress {
|
||||
None
|
||||
}
|
||||
|
||||
pub(super) fn send_back_to(self, message: Vec<u8>, connection_id: u64) -> ClientRequest {
|
||||
pub(super) fn send_back_to(self, message: Vec<u8>, connection_id: u64) -> InputMessage {
|
||||
match self {
|
||||
ReturnAddress::Known(recipient) => ClientRequest::Send {
|
||||
ReturnAddress::Known(recipient) => InputMessage::Regular {
|
||||
recipient: *recipient,
|
||||
message,
|
||||
connection_id: Some(connection_id),
|
||||
data: message,
|
||||
lane: TransmissionLane::ConnectionId(connection_id),
|
||||
},
|
||||
ReturnAddress::Anonymous(sender_tag) => ClientRequest::Reply {
|
||||
message,
|
||||
sender_tag,
|
||||
connection_id: Some(connection_id),
|
||||
ReturnAddress::Anonymous(sender_tag) => InputMessage::Reply {
|
||||
recipient_tag: sender_tag,
|
||||
data: message,
|
||||
lane: TransmissionLane::ConnectionId(connection_id),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,34 +0,0 @@
|
||||
// Copyright 2020 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
use tokio_tungstenite::{connect_async, MaybeTlsStream};
|
||||
|
||||
#[allow(clippy::upper_case_acronyms)]
|
||||
pub(crate) type TSWebsocketStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
|
||||
|
||||
pub struct Connection {
|
||||
uri: String,
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
pub fn new(uri: &str) -> Connection {
|
||||
Connection {
|
||||
uri: String::from(uri),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn connect(&self) -> Result<TSWebsocketStream, WebsocketConnectionError> {
|
||||
match connect_async(&self.uri).await {
|
||||
Ok((ws_stream, _)) => Ok(ws_stream),
|
||||
Err(e) => Err(WebsocketConnectionError::ConnectionNotEstablished(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum WebsocketConnectionError {
|
||||
#[error("Connection not established")]
|
||||
ConnectionNotEstablished(tokio_tungstenite::tungstenite::Error),
|
||||
}
|
||||
Reference in New Issue
Block a user