Compare commits

...

10 Commits

Author SHA1 Message Date
Jon Häggblad 53f3b0f607 WIP 2022-11-09 18:51:27 +01:00
Jon Häggblad a71508fdd7 group real messages a little 2022-11-09 15:40:08 +01:00
Jon Häggblad 98763d8a80 prio 2022-11-09 12:46:59 +01:00
Jon Häggblad 7da4d9439d usable 2022-11-09 11:55:06 +01:00
Jon Häggblad 7f0595e295 log statement 2022-11-09 11:29:28 +01:00
Jon Häggblad 63d0c74ddd WIP: compiles 2022-11-09 11:29:28 +01:00
Jon Häggblad 7e6b97fd64 WIP 2022-11-09 11:29:28 +01:00
Jon Häggblad 6ba7c74dac WIP 2022-11-09 11:29:28 +01:00
Jon Häggblad 3de9895203 WIP: note 2022-11-09 11:29:28 +01:00
Jon Häggblad 006767d62c WIP: socks5 done, next other side 2022-11-09 11:29:27 +01:00
23 changed files with 778 additions and 128 deletions
Generated
+20 -10
View File
@@ -131,9 +131,9 @@ dependencies = [
[[package]]
name = "async-trait"
version = "0.1.53"
version = "0.1.58"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed6aa3524a2dfcf9fe180c51eae2b58738348d819517ceadf95789c51fff7600"
checksum = "1e805d94e6b5001b651426cf4cd446b1ab5f319d27bab5c644f61de0a804360c"
dependencies = [
"proc-macro2",
"quote",
@@ -588,6 +588,7 @@ dependencies = [
"gateway-requests",
"gloo-timers",
"humantime-serde",
"itertools",
"log",
"nonexhaustive-delayqueue",
"nymsphinx",
@@ -2530,9 +2531,9 @@ dependencies = [
[[package]]
name = "itertools"
version = "0.10.3"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9a9d19fa1e79b6215ff29b9d6880b706147f16e9b1dbb1e4e5947b5b02bc5e3"
checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473"
dependencies = [
"either",
]
@@ -4012,11 +4013,11 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.37"
version = "1.0.47"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec757218438d5fda206afc041538b2f6d889286160d649a86a24d37e1235afd1"
checksum = "5ea3d908b0e36316caf9e9e2c4625cdde190a7e6f440d794667ed17a1855e725"
dependencies = [
"unicode-xid",
"unicode-ident",
]
[[package]]
@@ -4122,10 +4123,13 @@ dependencies = [
name = "proxy-helpers"
version = "0.1.0"
dependencies = [
"async-trait",
"bytes",
"client-core",
"futures",
"log",
"ordered-buffer",
"rand 0.7.3",
"socks5-requests",
"task",
"tokio",
@@ -5582,13 +5586,13 @@ dependencies = [
[[package]]
name = "syn"
version = "1.0.91"
version = "1.0.103"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b683b2b825c8eef438b77c36a06dc262294da3d5a5813fac20da149241dcd44d"
checksum = "a864042229133ada95abf3b54fdc62ef5ccabe9515b64717bcb9a1919e59445d"
dependencies = [
"proc-macro2",
"quote",
"unicode-xid",
"unicode-ident",
]
[[package]]
@@ -6310,6 +6314,12 @@ version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a01404663e3db436ed2746d9fefef640d868edae3cceb81c3b8d5732fda678f"
[[package]]
name = "unicode-ident"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3"
[[package]]
name = "unicode-normalization"
version = "0.1.9"
+2 -1
View File
@@ -31,6 +31,7 @@ validator-client = { path = "../../common/client-libs/validator-client", default
tap = "1.0.1"
tokio = { version = "1.21.2", features = ["time", "macros"]}
itertools = "0.10.5"
[target."cfg(target_arch = \"wasm32\")".dependencies.wasm-bindgen-futures]
version = "0.4"
@@ -56,4 +57,4 @@ tempfile = "3.1.0"
default = ["reply-surb"]
wasm = ["gateway-client/wasm"]
coconut = ["gateway-client/coconut", "gateway-requests/coconut"]
reply-surb = ["sled"]
reply-surb = ["sled"]
@@ -2,8 +2,8 @@ use futures::channel::mpsc;
use nymsphinx::addressing::clients::Recipient;
use nymsphinx::anonymous_replies::ReplySurb;
pub type InputMessageSender = mpsc::UnboundedSender<InputMessage>;
pub type InputMessageReceiver = mpsc::UnboundedReceiver<InputMessage>;
pub type InputMessageSender = tokio::sync::mpsc::Sender<InputMessage>;
pub type InputMessageReceiver = tokio::sync::mpsc::Receiver<InputMessage>;
#[derive(Debug)]
pub enum InputMessage {
@@ -33,7 +33,7 @@ impl AcknowledgementListener {
}
async fn on_ack(&mut self, ack_content: Vec<u8>) {
debug!("Received an ack");
trace!("Received an ack");
let frag_id = match recover_identifier(&self.ack_key, &ack_content)
.map(FragmentIdentifier::try_from_bytes)
{
@@ -13,7 +13,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
pub(crate) type ActionSender = UnboundedSender<Action>;
pub type ActionSender = UnboundedSender<Action>;
// The actual data being sent off as well as potential key to the delay queue
type PendingAckEntry = (Arc<PendingAcknowledgement>, Option<QueueKey>);
@@ -23,7 +23,7 @@ type PendingAckEntry = (Arc<PendingAcknowledgement>, Option<QueueKey>);
// - received an ack so we want to remove an entry
// - start a retransmission timer for sending the packet into the network (on either first try or retransmission)
// - update the internal sphinx delay of an expired packet
pub(crate) enum Action {
pub enum Action {
/// Inserts new `PendingAcknowledgement`s into the 'shared' state.
/// Initiated by `InputMessageListener`
InsertPending(Vec<PendingAcknowledgement>),
@@ -44,7 +44,7 @@ pub(crate) enum Action {
}
impl Action {
pub(crate) fn new_insert(pending_acks: Vec<PendingAcknowledgement>) -> Self {
pub fn new_insert(pending_acks: Vec<PendingAcknowledgement>) -> Self {
Action::InsertPending(pending_acks)
}
@@ -15,6 +15,7 @@ use nymsphinx::preparer::MessagePreparer;
use nymsphinx::{acknowledgements::AckKey, addressing::clients::Recipient};
use rand::{CryptoRng, Rng};
use std::sync::Arc;
use std::time::Duration;
#[cfg(feature = "reply-surb")]
use crate::client::reply_key_storage::ReplyKeyStorage;
@@ -41,6 +42,10 @@ impl<R> InputMessageListener<R>
where
R: CryptoRng + Rng,
{
pub fn get_action_sender(&self) -> ActionSender {
self.action_sender.clone()
}
// at this point I'm not entirely sure how to deal with this warning without
// some considerable refactoring
#[allow(clippy::too_many_arguments)]
@@ -182,9 +187,9 @@ where
// there's no point in trying to send nothing
if let Some(real_messages) = real_messages {
// tells real message sender (with the poisson timer) to send this to the mix network
self.real_message_sender
.unbounded_send(real_messages)
.unwrap();
if self.real_message_sender.send(real_messages).await.is_err() {
panic!();
}
}
}
@@ -194,7 +199,7 @@ where
while !shutdown.is_shutdown() {
tokio::select! {
input_msg = self.input_receiver.next() => match input_msg {
input_msg = self.input_receiver.recv() => match input_msg {
Some(input_msg) => {
self.on_input_message(input_msg).await;
},
@@ -220,3 +225,194 @@ where
}
}
}
#[derive(Clone)]
pub struct FreshInputMessageChunker<R>
where
R: CryptoRng + Rng,
{
ack_key: Arc<AckKey>,
ack_recipient: Recipient,
message_preparer: MessagePreparer<R>,
action_sender: ActionSender,
real_message_sender: BatchRealMessageSender,
topology_access: TopologyAccessor,
}
impl<R> FreshInputMessageChunker<R>
where
R: CryptoRng + Rng,
{
pub fn new(
ack_key: Arc<AckKey>,
ack_recipient: Recipient,
message_preparer: MessagePreparer<R>,
action_sender: ActionSender,
real_message_sender: BatchRealMessageSender,
topology_access: TopologyAccessor,
) -> Self {
Self {
ack_key,
ack_recipient,
message_preparer,
action_sender,
real_message_sender,
topology_access,
}
}
// we require topology for replies to generate surb_acks
//async fn handle_reply(&mut self, reply_surb: ReplySurb, data: Vec<u8>) -> Option<RealMessage> {
// let topology_permit = self.topology_access.get_read_permit().await;
// let topology = match topology_permit.try_get_valid_topology_ref(&self.ack_recipient, None) {
// Some(topology_ref) => topology_ref,
// None => {
// warn!("Could not process the message - the network topology is invalid");
// return None;
// }
// };
// match self
// .message_preparer
// .prepare_reply_for_use(data, reply_surb, topology, &self.ack_key)
// .await
// {
// Ok((mix_packet, reply_id)) => {
// // TODO: later probably write pending ack here
// // and deal with them....
// // ... somehow
// Some(RealMessage::new(mix_packet, reply_id))
// }
// Err(err) => {
// // TODO: should we have some mechanism to indicate to the user that the `reply_surb`
// // could be reused since technically it wasn't used up here?
// warn!("failed to deal with received reply surb - {:?}", err);
// None
// }
// }
//}
async fn handle_fresh_message(
&mut self,
recipient: Recipient,
content: Vec<u8>,
with_reply_surb: bool,
) -> Option<Vec<RealMessage>> {
let topology_permit = self.topology_access.get_read_permit().await;
let topology = match topology_permit
.try_get_valid_topology_ref(&self.ack_recipient, Some(&recipient))
{
Some(topology_ref) => topology_ref,
None => {
warn!("Could not process the message - the network topology is invalid");
return None;
}
};
// split the message, attach optional reply surb
let (split_message, reply_key) = self
.message_preparer
.prepare_and_split_message(content, with_reply_surb, topology)
.expect("somehow the topology was invalid after all!");
//#[cfg(feature = "reply-surb")]
//if let Some(reply_key) = reply_key {
// self.reply_key_storage
// .insert_encryption_key(reply_key)
// .expect("Failed to insert surb reply key to the store!")
//}
#[cfg(not(feature = "reply-surb"))]
let _reply_key = reply_key;
// encrypt chunks, put them inside sphinx packets and generate acks
let mut pending_acks = Vec::with_capacity(split_message.len());
let mut real_messages = Vec::with_capacity(split_message.len());
for message_chunk in split_message {
// we need to clone it because we need to keep it in memory in case we had to retransmit
// it. And then we'd need to recreate entire ACK again.
let chunk_clone = message_chunk.clone();
let prepared_fragment = self
.message_preparer
.prepare_chunk_for_sending(chunk_clone, topology, &self.ack_key, &recipient)
.unwrap();
real_messages.push(RealMessage::new(
prepared_fragment.mix_packet,
message_chunk.fragment_identifier(),
));
pending_acks.push(PendingAcknowledgement::new(
message_chunk,
prepared_fragment.total_delay,
recipient,
));
}
// tells the controller to put this into the hashmap
self.action_sender
.unbounded_send(Action::new_insert(pending_acks))
.unwrap();
Some(real_messages)
}
pub async fn on_input_message(&mut self, msg: InputMessage) {
log::info!("Using new chunker!");
let real_messages = match msg {
InputMessage::Fresh {
recipient,
data,
with_reply_surb,
} => {
self.handle_fresh_message(recipient, data, with_reply_surb)
.await
}
InputMessage::Reply { .. } => panic!(),
//InputMessage::Reply { reply_surb, data } => self
// .handle_reply(reply_surb, data)
// .await
// .map(|message| vec![message]),
};
// there's no point in trying to send nothing
if let Some(real_messages) = real_messages {
// tells real message sender (with the poisson timer) to send this to the mix network
use itertools::Itertools;
log::info!("chunked into {} messages", real_messages.len());
log::info!("current capacity: {}", self.real_message_sender.capacity());
if real_messages.len() > 10 {
// only send if there is nothing in queue
log::info!("sending large message(s)");
//let grouped: Vec<Vec<RealMessage>> = real_messages
// .into_iter()
// .chunks(10)
// .into_iter()
// .map(|c| c.collect())
// .collect();
for msg in real_messages {
let msgs = vec![msg];
//let msgs = msg.to_vec();
loop {
if self.real_message_sender.capacity() > 2 {
if self.real_message_sender.send(msgs).await.is_err() {
panic!();
}
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
log::error!("finished sending large message(s)");
} else {
log::info!("sending small message(s)");
if self.real_message_sender.send(real_messages).await.is_err() {
panic!();
}
}
}
}
}
@@ -1,6 +1,8 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use self::action_controller::ActionSender;
use self::input_message_listener::FreshInputMessageChunker;
use self::{
acknowledgement_listener::AcknowledgementListener, action_controller::ActionController,
input_message_listener::InputMessageListener,
@@ -31,8 +33,8 @@ use std::{
use crate::client::reply_key_storage::ReplyKeyStorage;
mod acknowledgement_listener;
mod action_controller;
mod input_message_listener;
pub mod action_controller;
pub mod input_message_listener;
mod retransmission_request_listener;
mod sent_notification_listener;
@@ -52,7 +54,7 @@ type SentPacketNotificationReceiver = mpsc::UnboundedReceiver<FragmentIdentifier
/// Structure representing a data `Fragment` that is on-route to the specified `Recipient`
#[derive(Debug)]
pub(crate) struct PendingAcknowledgement {
pub struct PendingAcknowledgement {
message_chunk: Fragment,
delay: SphinxDelay,
recipient: Recipient,
@@ -60,7 +62,7 @@ pub(crate) struct PendingAcknowledgement {
impl PendingAcknowledgement {
/// Creates new instance of `PendingAcknowledgement` using the provided data.
fn new(message_chunk: Fragment, delay: SphinxDelay, recipient: Recipient) -> Self {
pub fn new(message_chunk: Fragment, delay: SphinxDelay, recipient: Recipient) -> Self {
PendingAcknowledgement {
message_chunk,
delay,
@@ -110,7 +112,7 @@ impl AcknowledgementControllerConnectors {
}
/// Configurable parameters of the `AcknowledgementController`
pub(super) struct Config {
pub struct Config {
/// Given ack timeout in the form a * BASE_DELAY + b, it specifies the additive part `b`
ack_wait_addition: Duration,
@@ -118,17 +120,17 @@ pub(super) struct Config {
ack_wait_multiplier: f64,
/// Average delay an acknowledgement packet is going to get delayed at a single mixnode.
average_ack_delay: Duration,
pub average_ack_delay: Duration,
/// Average delay a data packet is going to get delayed at a single mixnode.
average_packet_delay: Duration,
pub average_packet_delay: Duration,
/// Predefined packet size used for the encapsulated messages.
packet_size: PacketSize,
pub packet_size: PacketSize,
}
impl Config {
pub(super) fn new(
pub fn new(
ack_wait_addition: Duration,
ack_wait_multiplier: f64,
average_ack_delay: Duration,
@@ -155,6 +157,7 @@ where
{
acknowledgement_listener: AcknowledgementListener,
input_message_listener: InputMessageListener<R>,
pub fresh_input_msg_chunker: FreshInputMessageChunker<R>,
retransmission_request_listener: RetransmissionRequestListener<R>,
sent_notification_listener: SentNotificationListener,
action_controller: ActionController,
@@ -164,6 +167,14 @@ impl<R> AcknowledgementController<R>
where
R: 'static + CryptoRng + Rng + Clone + Send,
{
pub fn get_action_sender(&self) -> ActionSender {
self.input_message_listener.get_action_sender()
}
//pub fn get_real_message_sender(&self) -> BatchRealMessageSender {
//self.input_message_listener.get_real_message_sender()
//}
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
config: Config,
@@ -209,6 +220,15 @@ where
reply_key_storage,
);
let fresh_input_msg_chunker = FreshInputMessageChunker::new(
Arc::clone(&ack_key),
ack_recipient,
message_preparer.clone(),
action_sender.clone(),
connectors.real_message_sender.clone(),
topology_access.clone(),
);
// will listen for any ack timeouts and trigger retransmission
let retransmission_request_listener = RetransmissionRequestListener::new(
Arc::clone(&ack_key),
@@ -228,6 +248,7 @@ where
AcknowledgementController {
acknowledgement_listener,
input_message_listener,
fresh_input_msg_chunker,
retransmission_request_listener,
sent_notification_listener,
action_controller,
@@ -112,12 +112,17 @@ where
.unwrap();
// send to `OutQueueControl` to eventually send to the mix network
self.real_message_sender
.unbounded_send(vec![RealMessage::new(
if self
.real_message_sender
.send(vec![RealMessage::new(
prepared_fragment.mix_packet,
frag_id,
)])
.unwrap();
.await
.is_err()
{
panic!();
}
}
#[cfg(not(target_arch = "wasm32"))]
@@ -5,6 +5,8 @@
// INPUT2: Acks from mix
// OUTPUT: MixMessage to mix traffic
use self::acknowledgement_control::action_controller::ActionSender;
use self::acknowledgement_control::input_message_listener::FreshInputMessageChunker;
use self::{
acknowledgement_control::AcknowledgementController, real_traffic_stream::OutQueueControl,
};
@@ -27,8 +29,8 @@ use std::time::Duration;
#[cfg(feature = "reply-surb")]
use crate::client::reply_key_storage::ReplyKeyStorage;
mod acknowledgement_control;
mod real_traffic_stream;
pub mod acknowledgement_control;
pub mod real_traffic_stream;
// TODO: ack_key and self_recipient shouldn't really be part of this config
pub struct Config {
@@ -36,10 +38,10 @@ pub struct Config {
ack_key: Arc<AckKey>,
/// Given ack timeout in the form a * BASE_DELAY + b, it specifies the additive part `b`
ack_wait_addition: Duration,
pub ack_wait_addition: Duration,
/// Given ack timeout in the form a * BASE_DELAY + b, it specifies the multiplier `a`
ack_wait_multiplier: f64,
pub ack_wait_multiplier: f64,
/// Address of `this` client.
self_recipient: Recipient,
@@ -48,17 +50,17 @@ pub struct Config {
average_message_sending_delay: Duration,
/// Average delay a data packet is going to get delayed at a single mixnode.
average_packet_delay_duration: Duration,
pub average_packet_delay_duration: Duration,
/// Average delay an acknowledgement packet is going to get delayed at a single mixnode.
average_ack_delay_duration: Duration,
pub average_ack_delay_duration: Duration,
/// Controls whether the main packet stream constantly produces packets according to the predefined
/// poisson distribution.
disable_main_poisson_packet_distribution: bool,
/// Predefined packet size used for the encapsulated messages.
packet_size: PacketSize,
pub packet_size: PacketSize,
}
impl Config {
@@ -103,6 +105,14 @@ where
// obviously when we finally make shared rng that is on 'higher' level, this should become
// generic `R`
impl RealMessagesController<OsRng> {
pub fn get_action_sender(&self) -> ActionSender {
self.ack_control.get_action_sender()
}
pub fn get_fresh_input_message_chunker(&self) -> FreshInputMessageChunker<OsRng> {
self.ack_control.fresh_input_msg_chunker.clone()
}
pub fn new(
config: Config,
ack_receiver: AcknowledgementReceiver,
@@ -113,7 +123,7 @@ impl RealMessagesController<OsRng> {
) -> Self {
let rng = OsRng;
let (real_message_sender, real_message_receiver) = mpsc::unbounded();
let (real_message_sender, real_message_receiver) = tokio::sync::mpsc::channel(3);
let (sent_notifier_tx, sent_notifier_rx) = mpsc::unbounded();
let ack_controller_connectors = AcknowledgementControllerConnectors::new(
@@ -226,13 +226,13 @@ where
received_buffer: VecDeque<RealMessage>,
}
pub(crate) struct RealMessage {
pub struct RealMessage {
mix_packet: MixPacket,
fragment_id: FragmentIdentifier,
}
impl RealMessage {
pub(crate) fn new(mix_packet: MixPacket, fragment_id: FragmentIdentifier) -> Self {
pub fn new(mix_packet: MixPacket, fragment_id: FragmentIdentifier) -> Self {
RealMessage {
mix_packet,
fragment_id,
@@ -242,8 +242,8 @@ impl RealMessage {
// messages are already prepared, etc. the real point of it is to forward it to mix_traffic
// after sufficient delay
pub(crate) type BatchRealMessageSender = mpsc::UnboundedSender<Vec<RealMessage>>;
type BatchRealMessageReceiver = mpsc::UnboundedReceiver<Vec<RealMessage>>;
pub type BatchRealMessageSender = tokio::sync::mpsc::Sender<Vec<RealMessage>>;
type BatchRealMessageReceiver = tokio::sync::mpsc::Receiver<Vec<RealMessage>>;
pub(crate) enum StreamMessage {
Cover,
@@ -425,7 +425,7 @@ where
}
// decide what kind of message to send
match Pin::new(&mut self.real_receiver).poll_next(cx) {
match Pin::new(&mut self.real_receiver).poll_recv(cx) {
// in the case our real message channel stream was closed, we should also indicate we are closed
// (and whoever is using the stream should panic)
Poll::Ready(None) => Poll::Ready(None),
@@ -472,7 +472,7 @@ where
return Poll::Ready(Some(StreamMessage::Real(Box::new(real_available))));
}
match Pin::new(&mut self.real_receiver).poll_next(cx) {
match Pin::new(&mut self.real_receiver).poll_recv(cx) {
// in the case our real message channel stream was closed, we should also indicate we are closed
// (and whoever is using the stream should panic)
Poll::Ready(None) => Poll::Ready(None),
@@ -208,7 +208,7 @@ impl ReceivedMessagesBuffer {
}
async fn handle_new_received(&mut self, msgs: Vec<Vec<u8>>) {
debug!(
trace!(
"Processing {:?} new message that might get added to the buffer!",
msgs.len()
);
@@ -50,7 +50,7 @@ impl<'a> Deref for TopologyReadPermit<'a> {
impl<'a> TopologyReadPermit<'a> {
/// Using provided topology read permit, tries to get an immutable reference to the underlying
/// topology. For obvious reasons the lifetime of the topology reference is bound to the permit.
pub(super) fn try_get_valid_topology_ref(
pub fn try_get_valid_topology_ref(
&'a self,
ack_recipient: &Recipient,
packet_recipient: Option<&Recipient>,
+48 -19
View File
@@ -8,6 +8,7 @@ use client_core::client::inbound_messages::{
use client_core::client::key_manager::KeyManager;
use client_core::client::mix_traffic::{BatchMixMessageSender, MixTrafficController};
use client_core::client::real_messages_control;
use client_core::client::real_messages_control::acknowledgement_control::input_message_listener::FreshInputMessageChunker;
use client_core::client::real_messages_control::RealMessagesController;
use client_core::client::received_buffer::{
ReceivedBufferMessage, ReceivedBufferRequestReceiver, ReceivedBufferRequestSender,
@@ -31,6 +32,7 @@ use nymsphinx::addressing::clients::Recipient;
use nymsphinx::addressing::nodes::NodeIdentity;
use nymsphinx::anonymous_replies::ReplySurb;
use nymsphinx::receiver::ReconstructedMessage;
use rand::rngs::OsRng;
use task::{wait_for_signal, ShutdownListener, ShutdownNotifier};
use crate::client::config::{Config, SocketType};
@@ -118,7 +120,7 @@ impl NymClient {
input_receiver: InputMessageReceiver,
mix_sender: BatchMixMessageSender,
shutdown: ShutdownListener,
) {
) -> FreshInputMessageChunker<OsRng> {
let mut controller_config = real_messages_control::Config::new(
self.key_manager.ack_key(),
self.config.get_base().get_ack_wait_multiplier(),
@@ -139,15 +141,20 @@ impl NymClient {
info!("Starting real traffic stream...");
RealMessagesController::new(
let real_message_controller = RealMessagesController::new(
controller_config,
ack_receiver,
input_receiver,
mix_sender,
topology_accessor,
reply_key_storage,
)
.start_with_shutdown(shutdown);
);
let fresh_input_msg_chunker = real_message_controller.get_fresh_input_message_chunker();
real_message_controller.start_with_shutdown(shutdown);
fresh_input_msg_chunker
}
// buffer controlling all messages fetched from provider
@@ -279,11 +286,16 @@ impl NymClient {
&self,
buffer_requester: ReceivedBufferRequestSender,
msg_input: InputMessageSender,
fresh_input_msg_chunker: FreshInputMessageChunker<OsRng>,
) {
info!("Starting websocket listener...");
let websocket_handler =
websocket::Handler::new(msg_input, buffer_requester, self.as_mix_recipient());
let websocket_handler = websocket::Handler::new(
msg_input,
buffer_requester,
self.as_mix_recipient(),
fresh_input_msg_chunker,
);
websocket::Listener::new(self.config.get_listening_port()).start(websocket_handler);
}
@@ -291,27 +303,42 @@ impl NymClient {
/// EXPERIMENTAL DIRECT RUST API
/// It's untested and there are absolutely no guarantees about it (but seems to have worked
/// well enough in local tests)
pub fn send_message(&mut self, recipient: Recipient, message: Vec<u8>, with_reply_surb: bool) {
pub async fn send_message(
&mut self,
recipient: Recipient,
message: Vec<u8>,
with_reply_surb: bool,
) {
let input_msg = InputMessage::new_fresh(recipient, message, with_reply_surb);
self.input_tx
if self
.input_tx
.as_ref()
.expect("start method was not called before!")
.unbounded_send(input_msg)
.unwrap();
.send(input_msg)
.await
.is_err()
{
panic!();
}
}
/// EXPERIMENTAL DIRECT RUST API
/// It's untested and there are absolutely no guarantees about it (but seems to have worked
/// well enough in local tests)
pub fn send_reply(&mut self, reply_surb: ReplySurb, message: Vec<u8>) {
pub async fn send_reply(&mut self, reply_surb: ReplySurb, message: Vec<u8>) {
let input_msg = InputMessage::new_reply(reply_surb, message);
self.input_tx
if self
.input_tx
.as_ref()
.expect("start method was not called before!")
.unbounded_send(input_msg)
.unwrap();
.send(input_msg)
.await
.is_err()
{
panic!();
}
}
/// EXPERIMENTAL DIRECT RUST API
@@ -368,7 +395,7 @@ impl NymClient {
let (received_buffer_request_sender, received_buffer_request_receiver) = mpsc::unbounded();
// channels responsible for controlling real messages
let (input_sender, input_receiver) = mpsc::unbounded::<InputMessage>();
let (input_sender, input_receiver) = tokio::sync::mpsc::channel::<InputMessage>(3);
// channels responsible for controlling ack messages
let (ack_sender, ack_receiver) = mpsc::unbounded();
@@ -403,7 +430,7 @@ impl NymClient {
let sphinx_message_sender =
Self::start_mix_traffic_controller(gateway_client, shutdown.subscribe());
self.start_real_traffic_controller(
let fresh_input_msg_chunker = self.start_real_traffic_controller(
shared_topology_accessor.clone(),
reply_key_storage,
ack_receiver,
@@ -425,9 +452,11 @@ impl NymClient {
}
match self.config.get_socket_type() {
SocketType::WebSocket => {
self.start_websocket_listener(received_buffer_request_sender, input_sender)
}
SocketType::WebSocket => self.start_websocket_listener(
received_buffer_request_sender,
input_sender,
fresh_input_msg_chunker,
),
SocketType::None => {
// if we did not start the socket, it means we're running (supposedly) in the native mode
// and hence we should announce 'ourselves' to the buffer
+29 -15
View File
@@ -3,6 +3,7 @@
use client_core::client::{
inbound_messages::{InputMessage, InputMessageSender},
real_messages_control::acknowledgement_control::input_message_listener::FreshInputMessageChunker,
received_buffer::{
ReceivedBufferMessage, ReceivedBufferRequestSender, ReconstructedMessagesReceiver,
},
@@ -13,6 +14,7 @@ use log::*;
use nymsphinx::addressing::clients::Recipient;
use nymsphinx::anonymous_replies::ReplySurb;
use nymsphinx::receiver::ReconstructedMessage;
use rand::rngs::OsRng;
use tokio::net::TcpStream;
use tokio_tungstenite::{
accept_async,
@@ -38,6 +40,7 @@ pub(crate) struct Handler {
self_full_address: Recipient,
socket: Option<WebSocketStream<TcpStream>>,
received_response_type: ReceivedResponseType,
fresh_input_msg_chunker: FreshInputMessageChunker<OsRng>,
}
// clone is used to use handler on a new connection, which initially is `None`
@@ -49,6 +52,7 @@ impl Clone for Handler {
self_full_address: self.self_full_address,
socket: None,
received_response_type: Default::default(),
fresh_input_msg_chunker: self.fresh_input_msg_chunker.clone(),
}
}
}
@@ -66,6 +70,7 @@ impl Handler {
msg_input: InputMessageSender,
buffer_requester: ReceivedBufferRequestSender,
self_full_address: Recipient,
fresh_input_msg_chunker: FreshInputMessageChunker<OsRng>,
) -> Self {
Handler {
msg_input,
@@ -73,10 +78,11 @@ impl Handler {
self_full_address,
socket: None,
received_response_type: Default::default(),
fresh_input_msg_chunker,
}
}
fn handle_send(
async fn handle_send(
&mut self,
recipient: Recipient,
message: Vec<u8>,
@@ -84,18 +90,26 @@ impl Handler {
) -> Option<ServerResponse> {
// the ack control is now responsible for chunking, etc.
let input_msg = InputMessage::new_fresh(recipient, message, with_reply_surb);
self.msg_input.unbounded_send(input_msg).unwrap();
// WIP(JON): here we should chunk the message, and send it to the sphinx_message_sender
self.fresh_input_msg_chunker
.on_input_message(input_msg)
.await;
//self.msg_input.unbounded_send(input_msg).unwrap();
None
}
fn handle_reply(&mut self, reply_surb: ReplySurb, message: Vec<u8>) -> Option<ServerResponse> {
async fn handle_reply(&mut self, reply_surb: ReplySurb, message: Vec<u8>) -> Option<ServerResponse> {
if message.len() > ReplySurb::max_msg_len(Default::default()) {
return Some(ServerResponse::new_error(format!("too long message to put inside a reply SURB. Received: {} bytes and maximum is {} bytes", message.len(), ReplySurb::max_msg_len(Default::default()))));
}
let input_msg = InputMessage::new_reply(reply_surb, message);
self.msg_input.unbounded_send(input_msg).unwrap();
if self.msg_input.send(input_msg).await.is_err() {
panic!();
}
None
}
@@ -104,22 +118,22 @@ impl Handler {
ServerResponse::SelfAddress(self.self_full_address)
}
fn handle_request(&mut self, request: ClientRequest) -> Option<ServerResponse> {
async fn handle_request(&mut self, request: ClientRequest) -> Option<ServerResponse> {
match request {
ClientRequest::Send {
recipient,
message,
with_reply_surb,
} => self.handle_send(recipient, message, with_reply_surb),
} => self.handle_send(recipient, message, with_reply_surb).await,
ClientRequest::Reply {
message,
reply_surb,
} => self.handle_reply(reply_surb, message),
} => self.handle_reply(reply_surb, message).await,
ClientRequest::SelfAddress => Some(self.handle_self_address()),
}
}
fn handle_text_message(&mut self, msg: String) -> Option<WsMessage> {
async fn handle_text_message(&mut self, msg: String) -> Option<WsMessage> {
debug!("Handling text message request");
trace!("Content: {:?}", msg);
@@ -128,13 +142,13 @@ impl Handler {
let response = match client_request {
Err(err) => Some(ServerResponse::Error(err)),
Ok(req) => self.handle_request(req),
Ok(req) => self.handle_request(req).await,
};
response.map(|resp| WsMessage::text(resp.into_text()))
}
fn handle_binary_message(&mut self, msg: Vec<u8>) -> Option<WsMessage> {
async fn handle_binary_message(&mut self, msg: Vec<u8>) -> Option<WsMessage> {
debug!("Handling binary message request");
self.received_response_type = ReceivedResponseType::Binary;
@@ -142,19 +156,19 @@ impl Handler {
let response = match client_request {
Err(err) => Some(ServerResponse::Error(err)),
Ok(req) => self.handle_request(req),
Ok(req) => self.handle_request(req).await,
};
response.map(|resp| WsMessage::Binary(resp.into_binary()))
}
fn handle_ws_request(&mut self, raw_request: WsMessage) -> Option<WsMessage> {
async fn handle_ws_request(&mut self, raw_request: WsMessage) -> Option<WsMessage> {
// apparently tungstenite auto-handles ping/pong/close messages so for now let's ignore
// them and let's test that claim. If that's not the case, just copy code from
// old version of this file.
match raw_request {
WsMessage::Text(text_message) => self.handle_text_message(text_message),
WsMessage::Binary(binary_message) => self.handle_binary_message(binary_message),
WsMessage::Text(text_message) => self.handle_text_message(text_message).await,
WsMessage::Binary(binary_message) => self.handle_binary_message(binary_message).await,
_ => None,
}
}
@@ -244,7 +258,7 @@ impl Handler {
break;
}
if let Some(response) = self.handle_ws_request(socket_msg) {
if let Some(response) = self.handle_ws_request(socket_msg).await {
if let Err(err) = self.send_websocket_response(response).await {
warn!(
"Failed to send message over websocket: {}. Assuming the connection is dead.",
+34 -7
View File
@@ -1,7 +1,8 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// Copyright 2021 - Nym Technologies SA <contat@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::sync::atomic::Ordering;
use std::sync::Arc;
use crate::client::config::Config;
use crate::error::Socks5ClientError;
@@ -15,6 +16,16 @@ use client_core::client::inbound_messages::{
};
use client_core::client::key_manager::KeyManager;
use client_core::client::mix_traffic::{BatchMixMessageSender, MixTrafficController};
use client_core::client::real_messages_control::acknowledgement_control::action_controller::{
Action, ActionSender,
};
use client_core::client::real_messages_control::acknowledgement_control::input_message_listener::FreshInputMessageChunker;
use client_core::client::real_messages_control::acknowledgement_control::{
self, PendingAcknowledgement,
};
use client_core::client::real_messages_control::real_traffic_stream::{
BatchRealMessageSender, RealMessage,
};
use client_core::client::real_messages_control::RealMessagesController;
use client_core::client::received_buffer::{
ReceivedBufferRequestReceiver, ReceivedBufferRequestSender, ReceivedMessagesBufferController,
@@ -34,8 +45,13 @@ use gateway_client::{
MixnetMessageSender,
};
use log::*;
use nymsphinx::acknowledgements::AckKey;
use nymsphinx::addressing::clients::Recipient;
use nymsphinx::addressing::nodes::NodeIdentity;
use nymsphinx::params::PacketSize;
use nymsphinx::preparer::MessagePreparer;
use rand::rngs::OsRng;
use rand::{CryptoRng, Rng};
use task::{wait_for_signal, ShutdownListener, ShutdownNotifier};
pub mod config;
@@ -118,7 +134,7 @@ impl NymClient {
input_receiver: InputMessageReceiver,
mix_sender: BatchMixMessageSender,
shutdown: ShutdownListener,
) {
) -> FreshInputMessageChunker<OsRng> {
let mut controller_config = client_core::client::real_messages_control::Config::new(
self.key_manager.ack_key(),
self.config.get_base().get_ack_wait_multiplier(),
@@ -139,15 +155,20 @@ impl NymClient {
info!("Starting real traffic stream...");
RealMessagesController::new(
let real_message_controller = RealMessagesController::new(
controller_config,
ack_receiver,
input_receiver,
mix_sender,
topology_accessor,
reply_key_storage,
)
.start_with_shutdown(shutdown);
);
let fresh_input_msg_chunker = real_message_controller.get_fresh_input_message_chunker();
real_message_controller.start_with_shutdown(shutdown);
fresh_input_msg_chunker
}
// buffer controlling all messages fetched from provider
@@ -280,18 +301,23 @@ impl NymClient {
buffer_requester: ReceivedBufferRequestSender,
msg_input: InputMessageSender,
shutdown: ShutdownListener,
fresh_input_msg_chunker: FreshInputMessageChunker<OsRng>,
) {
info!("Starting socks5 listener...");
let auth_methods = vec![AuthenticationMethods::NoAuth as u8];
let allowed_users: Vec<User> = Vec::new();
let authenticator = Authenticator::new(auth_methods, allowed_users);
let mut sphinx_socks = SphinxSocksServer::new(
self.config.get_listening_port(),
authenticator,
self.config.get_provider_mix_address(),
self.as_mix_recipient(),
shutdown,
//self.key_manager.ack_key(),
//self.as_mix_recipient(),
fresh_input_msg_chunker,
);
tokio::spawn(async move { sphinx_socks.serve(msg_input, buffer_requester).await });
}
@@ -361,7 +387,7 @@ impl NymClient {
let (received_buffer_request_sender, received_buffer_request_receiver) = mpsc::unbounded();
// channels responsible for controlling real messages
let (input_sender, input_receiver) = mpsc::unbounded::<InputMessage>();
let (input_sender, input_receiver) = tokio::sync::mpsc::channel::<InputMessage>(3);
// channels responsible for controlling ack messages
let (ack_sender, ack_receiver) = mpsc::unbounded();
@@ -396,7 +422,7 @@ impl NymClient {
let sphinx_message_sender =
Self::start_mix_traffic_controller(gateway_client, shutdown.subscribe());
self.start_real_traffic_controller(
let fresh_input_msg_chunker = self.start_real_traffic_controller(
shared_topology_accessor.clone(),
reply_key_storage,
ack_receiver,
@@ -421,6 +447,7 @@ impl NymClient {
received_buffer_request_sender,
input_sender,
shutdown.subscribe(),
fresh_input_msg_chunker,
);
info!("Client startup finished!");
+226 -2
View File
@@ -6,24 +6,41 @@ use super::types::{ResponseCode, SocksProxyError};
use super::{RESERVED, SOCKS_VERSION};
use client_core::client::inbound_messages::InputMessage;
use client_core::client::inbound_messages::InputMessageSender;
use client_core::client::real_messages_control::acknowledgement_control::input_message_listener::FreshInputMessageChunker;
use futures::channel::mpsc;
use futures::task::{Context, Poll};
use log::*;
use nymsphinx::acknowledgements::AckKey;
use nymsphinx::addressing::clients::Recipient;
use nymsphinx::preparer::MessagePreparer;
use pin_project::pin_project;
use proxy_helpers::connection_controller::{
ConnectionReceiver, ControllerCommand, ControllerSender,
};
use proxy_helpers::proxy_runner::ProxyRunner;
use rand::RngCore;
use rand::rngs::OsRng;
use rand::{CryptoRng, Rng, RngCore};
use socks5_requests::{ConnectionId, Message, RemoteAddress, Request};
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use task::ShutdownListener;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio::{self, net::TcpStream};
use client_core::client::{
inbound_messages::InputMessageReceiver,
real_messages_control::{
acknowledgement_control::{
action_controller::{Action, ActionSender},
PendingAcknowledgement,
},
real_traffic_stream::{BatchRealMessageSender, RealMessage},
},
topology_control::TopologyAccessor,
};
#[pin_project(project = StateProject)]
enum StreamState {
Available(TcpStream),
@@ -142,6 +159,9 @@ pub(crate) struct SocksClient {
self_address: Recipient,
started_proxy: bool,
shutdown_listener: ShutdownListener,
//ack_key: Arc<AckKey>,
//ack_recipient: Recipient,
fresh_input_msg_chunker: FreshInputMessageChunker<OsRng>,
}
impl Drop for SocksClient {
@@ -166,6 +186,9 @@ impl SocksClient {
controller_sender: ControllerSender,
self_address: Recipient,
shutdown_listener: ShutdownListener,
//ack_key: Arc<AckKey>,
//ack_recipient: Recipient,
fresh_input_msg_chunker: FreshInputMessageChunker<OsRng>,
) -> Self {
let connection_id = Self::generate_random();
SocksClient {
@@ -180,6 +203,9 @@ impl SocksClient {
self_address,
started_proxy: false,
shutdown_listener,
//ack_key,
//ack_recipient,
fresh_input_msg_chunker,
}
}
@@ -231,7 +257,9 @@ impl SocksClient {
let msg = Message::Request(req);
let input_message = InputMessage::new_fresh(self.service_provider, msg.into_bytes(), false);
self.input_sender.unbounded_send(input_message).unwrap();
if self.input_sender.send(input_message).await.is_err() {
panic!();
}
}
async fn run_proxy(&mut self, conn_receiver: ConnectionReceiver, remote_proxy_target: String) {
@@ -246,7 +274,21 @@ impl SocksClient {
let connection_id = self.connection_id;
let input_sender = self.input_sender.clone();
// WIP(JON)
//self.fresh_input_msg_chunker;
let recipient = self.service_provider;
//let mut fresh_input_msg_chunker = self.fresh_input_msg_chunker.clone();
//let fn_fresh_input_msg_chunker = move |conn_id, read_data, socket_closed| {
// let provider_request = Request::new_send(conn_id, read_data, socket_closed);
// let provider_message = Message::Request(provider_request);
// let msg = InputMessage::new_fresh(recipient, provider_message.into_bytes(), false);
// fresh_input_msg_chunker.on_input_message(msg)
//};
//let msg_chunker = Box::new(MsgChunker::new(self.fresh_input_msg_chunker.clone()));
let msg_chunker = self.fresh_input_msg_chunker.clone();
let (stream, _) = ProxyRunner::new(
stream,
local_stream_remote,
@@ -255,6 +297,7 @@ impl SocksClient {
input_sender,
connection_id,
self.shutdown_listener.clone(),
Some(Box::new(msg_chunker)),
)
.run(move |conn_id, read_data, socket_closed| {
let provider_request = Request::new_send(conn_id, read_data, socket_closed);
@@ -430,3 +473,184 @@ impl SocksClient {
Ok(methods)
}
}
//pub struct MsgChunker {
// fresh_input_msg_chunker: FreshInputMessageChunker<OsRng>,
//}
//
//impl MsgChunker {
// fn new(fresh_input_msg_chunker: FreshInputMessageChunker<OsRng>) -> Self {
// Self {
// fresh_input_msg_chunker,
// }
// }
//
// pub async fn on_input_message(&mut self, msg: InputMessage) {
// self.fresh_input_msg_chunker.on_input_message(msg);
// }
//}
//let mut fresh_input_msg_chunker = self.fresh_input_msg_chunker.clone();
//let fn_fresh_input_msg_chunker = move |conn_id, read_data, socket_closed| {
// let provider_request = Request::new_send(conn_id, read_data, socket_closed);
// let provider_message = Message::Request(provider_request);
// let msg = InputMessage::new_fresh(recipient, provider_message.into_bytes(), false);
// fresh_input_msg_chunker.on_input_message(msg)
//};
//struct FreshInputMessageChunker<R>
//where
// R: CryptoRng + Rng,
//{
// ack_key: Arc<AckKey>,
// ack_recipient: Recipient,
// message_preparer: MessagePreparer<R>,
// action_sender: ActionSender,
// real_message_sender: BatchRealMessageSender,
// topology_access: TopologyAccessor,
//}
//
//impl<R> FreshInputMessageChunker<R>
//where
// R: CryptoRng + Rng,
//{
// fn new(
// ack_key: Arc<AckKey>,
// ack_recipient: Recipient,
// message_preparer: MessagePreparer<R>,
// action_sender: ActionSender,
// real_message_sender: BatchRealMessageSender,
// topology_access: TopologyAccessor,
// ) -> Self {
// Self {
// ack_key,
// ack_recipient,
// message_preparer,
// action_sender,
// real_message_sender,
// topology_access,
// }
// }
//
// // we require topology for replies to generate surb_acks
// //async fn handle_reply(&mut self, reply_surb: ReplySurb, data: Vec<u8>) -> Option<RealMessage> {
// // let topology_permit = self.topology_access.get_read_permit().await;
// // let topology = match topology_permit.try_get_valid_topology_ref(&self.ack_recipient, None) {
// // Some(topology_ref) => topology_ref,
// // None => {
// // warn!("Could not process the message - the network topology is invalid");
// // return None;
// // }
// // };
//
// // match self
// // .message_preparer
// // .prepare_reply_for_use(data, reply_surb, topology, &self.ack_key)
// // .await
// // {
// // Ok((mix_packet, reply_id)) => {
// // // TODO: later probably write pending ack here
// // // and deal with them....
// // // ... somehow
// // Some(RealMessage::new(mix_packet, reply_id))
// // }
// // Err(err) => {
// // // TODO: should we have some mechanism to indicate to the user that the `reply_surb`
// // // could be reused since technically it wasn't used up here?
// // warn!("failed to deal with received reply surb - {:?}", err);
// // None
// // }
// // }
// //}
//
// async fn handle_fresh_message(
// &mut self,
// recipient: Recipient,
// content: Vec<u8>,
// with_reply_surb: bool,
// ) -> Option<Vec<RealMessage>> {
// let topology_permit = self.topology_access.get_read_permit().await;
// let topology = match topology_permit
// .try_get_valid_topology_ref(&self.ack_recipient, Some(&recipient))
// {
// Some(topology_ref) => topology_ref,
// None => {
// warn!("Could not process the message - the network topology is invalid");
// return None;
// }
// };
//
// // split the message, attach optional reply surb
// let (split_message, reply_key) = self
// .message_preparer
// .prepare_and_split_message(content, with_reply_surb, topology)
// .expect("somehow the topology was invalid after all!");
//
// #[cfg(feature = "reply-surb")]
// if let Some(reply_key) = reply_key {
// self.reply_key_storage
// .insert_encryption_key(reply_key)
// .expect("Failed to insert surb reply key to the store!")
// }
//
// #[cfg(not(feature = "reply-surb"))]
// let _reply_key = reply_key;
//
// // encrypt chunks, put them inside sphinx packets and generate acks
// let mut pending_acks = Vec::with_capacity(split_message.len());
// let mut real_messages = Vec::with_capacity(split_message.len());
// for message_chunk in split_message {
// // we need to clone it because we need to keep it in memory in case we had to retransmit
// // it. And then we'd need to recreate entire ACK again.
// let chunk_clone = message_chunk.clone();
// let prepared_fragment = self
// .message_preparer
// .prepare_chunk_for_sending(chunk_clone, topology, &self.ack_key, &recipient)
// .unwrap();
//
// real_messages.push(RealMessage::new(
// prepared_fragment.mix_packet,
// message_chunk.fragment_identifier(),
// ));
//
// pending_acks.push(PendingAcknowledgement::new(
// message_chunk,
// prepared_fragment.total_delay,
// recipient,
// ));
// }
//
// // tells the controller to put this into the hashmap
// self.action_sender
// .unbounded_send(Action::new_insert(pending_acks))
// .unwrap();
//
// Some(real_messages)
// }
//
// pub async fn on_input_message(&mut self, msg: InputMessage) {
// let real_messages = match msg {
// InputMessage::Fresh {
// recipient,
// data,
// with_reply_surb,
// } => {
// self.handle_fresh_message(recipient, data, with_reply_surb)
// .await
// }
// InputMessage::Reply { .. } => panic!(),
// //InputMessage::Reply { reply_surb, data } => self
// // .handle_reply(reply_surb, data)
// // .await
// // .map(|message| vec![message]),
// };
//
// // there's no point in trying to send nothing
// if let Some(real_messages) = real_messages {
// // tells real message sender (with the poisson timer) to send this to the mix network
// self.real_message_sender
// .unbounded_send(real_messages)
// .unwrap();
// }
// }
//}
+16
View File
@@ -4,13 +4,17 @@ use super::{
mixnet_responses::MixnetResponseListener,
types::{ResponseCode, SocksProxyError},
};
use client_core::client::real_messages_control::acknowledgement_control::input_message_listener::FreshInputMessageChunker;
use client_core::client::{
inbound_messages::InputMessageSender, received_buffer::ReceivedBufferRequestSender,
};
use log::*;
use nymsphinx::acknowledgements::AckKey;
use nymsphinx::addressing::clients::Recipient;
use proxy_helpers::connection_controller::Controller;
use rand::rngs::OsRng;
use std::net::SocketAddr;
use std::sync::Arc;
use task::ShutdownListener;
use tokio::net::TcpListener;
@@ -21,6 +25,9 @@ pub struct SphinxSocksServer {
service_provider: Recipient,
self_address: Recipient,
shutdown: ShutdownListener,
//ack_key: Arc<AckKey>,
//ack_recipient: Recipient,
fresh_input_msg_chunker: FreshInputMessageChunker<OsRng>,
}
impl SphinxSocksServer {
@@ -31,6 +38,9 @@ impl SphinxSocksServer {
service_provider: Recipient,
self_address: Recipient,
shutdown: ShutdownListener,
//ack_key: Arc<AckKey>,
//ack_recipient: Recipient,
fresh_input_msg_chunker: FreshInputMessageChunker<OsRng>,
) -> Self {
// hardcode ip as we (presumably) ONLY want to listen locally. If we change it, we can
// just modify the config
@@ -42,6 +52,9 @@ impl SphinxSocksServer {
service_provider,
self_address,
shutdown,
//ack_key,
//ack_recipient,
fresh_input_msg_chunker,
}
}
@@ -84,6 +97,9 @@ impl SphinxSocksServer {
controller_sender.clone(),
self.self_address,
self.shutdown.clone(),
//self.ack_key.clone(),
//self.ack_recipient.clone(),
self.fresh_input_msg_chunker.clone(),
);
tokio::spawn(async move {
+4
View File
@@ -18,5 +18,9 @@ socks5-requests = { path = "../requests" }
ordered-buffer = { path = "../ordered-buffer" }
task = { path = "../../task" }
client-core = { path = "../../../clients/client-core" }
rand = { version = "0.7.3", features = ["wasm-bindgen"] }
async-trait = "0.1.58"
[dev-dependencies]
tokio-test = "0.4.2"
@@ -1,21 +1,25 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::Chunker;
use super::MixProxySender;
use super::SHUTDOWN_TIMEOUT;
use crate::available_reader::AvailableReader;
use bytes::Bytes;
use client_core::client::real_messages_control::acknowledgement_control::input_message_listener::FreshInputMessageChunker;
use futures::FutureExt;
use futures::StreamExt;
use log::*;
use ordered_buffer::OrderedMessageSender;
use rand::rngs::OsRng;
use socks5_requests::ConnectionId;
use std::time::Duration;
use std::{io, sync::Arc};
use task::ShutdownListener;
use tokio::select;
use tokio::{net::tcp::OwnedReadHalf, sync::Notify, time::sleep};
fn send_empty_close<F, S>(
async fn send_empty_close<F, S>(
connection_id: ConnectionId,
message_sender: &mut OrderedMessageSender,
mix_sender: &MixProxySender<S>,
@@ -24,12 +28,16 @@ fn send_empty_close<F, S>(
F: Fn(ConnectionId, Vec<u8>, bool) -> S,
{
let ordered_msg = message_sender.wrap_message(Vec::new()).into_bytes();
mix_sender
.unbounded_send(adapter_fn(connection_id, ordered_msg, true))
.unwrap();
if mix_sender
.send(adapter_fn(connection_id, ordered_msg, true))
.await
.is_err()
{
panic!();
};
}
fn deal_with_data<F, S>(
async fn deal_with_data<F, S>(
read_data: Option<io::Result<Bytes>>,
local_destination_address: &str,
remote_source_address: &str,
@@ -37,6 +45,7 @@ fn deal_with_data<F, S>(
message_sender: &mut OrderedMessageSender,
mix_sender: &MixProxySender<S>,
adapter_fn: F,
msg_chunker: &mut Option<Box<dyn Chunker<S>>>,
) -> bool
where
F: Fn(ConnectionId, Vec<u8>, bool) -> S,
@@ -63,9 +72,42 @@ where
// if we're sending through the mixnet increase the sequence number...
let ordered_msg = message_sender.wrap_message(read_data.to_vec()).into_bytes();
mix_sender
.unbounded_send(adapter_fn(connection_id, ordered_msg, is_finished))
.unwrap();
// WIP(JON): here we do the chunking, and send to real_message_sender instead
if let Some(chunker) = msg_chunker {
log::info!("({connection_id}): chunking and sending");
let msg = adapter_fn(connection_id, ordered_msg, is_finished);
chunker.chunk(msg).await;
log::info!("({connection_id}): chunking and sending done");
} else {
log::info!("ordered_msg.len: {}", ordered_msg.len());
if ordered_msg.len() > 10000 {
log::info!("Sending large");
log::info!("capacity: {}", mix_sender.capacity());
loop {
if mix_sender.capacity() > 2 {
if mix_sender
.send(adapter_fn(connection_id, ordered_msg, is_finished))
.await
.is_err()
{
panic!();
}
break;
}
sleep(Duration::from_millis(100)).await;
}
} else {
log::info!("Sending smaller");
if mix_sender
.send(adapter_fn(connection_id, ordered_msg, is_finished))
.await
.is_err()
{
panic!();
}
}
}
if is_finished {
// technically we already informed it when we sent the message to mixnet above
@@ -85,6 +127,7 @@ pub(super) async fn run_inbound<F, S>(
adapter_fn: F,
shutdown_notify: Arc<Notify>,
mut shutdown_listener: ShutdownListener,
mut msg_chunker: Option<Box<dyn Chunker<S>>>,
) -> OwnedReadHalf
where
F: Fn(ConnectionId, Vec<u8>, bool) -> S + Send + 'static,
@@ -98,7 +141,7 @@ where
loop {
select! {
read_data = &mut available_reader.next() => {
if deal_with_data(read_data, &local_destination_address, &remote_source_address, connection_id, &mut message_sender, &mix_sender, &adapter_fn) {
if deal_with_data(read_data, &local_destination_address, &remote_source_address, connection_id, &mut message_sender, &mix_sender, &adapter_fn, &mut msg_chunker).await {
break
}
}
@@ -106,7 +149,7 @@ where
debug!("closing inbound proxy after outbound was closed {:?} ago", SHUTDOWN_TIMEOUT);
// inform remote just in case it was closed because of lack of heartbeat.
// worst case the remote will just have couple of false negatives
send_empty_close(connection_id, &mut message_sender, &mix_sender, &adapter_fn);
send_empty_close(connection_id, &mut message_sender, &mix_sender, &adapter_fn).await;
break;
}
_ = shutdown_listener.recv() => {
@@ -2,17 +2,25 @@
// SPDX-License-Identifier: Apache-2.0
use crate::connection_controller::ConnectionReceiver;
use async_trait::async_trait;
use futures::channel::mpsc;
use rand::rngs::OsRng;
use socks5_requests::ConnectionId;
use std::{sync::Arc, time::Duration};
use task::ShutdownListener;
use tokio::{net::TcpStream, sync::Notify};
use client_core::client::{
inbound_messages::InputMessage,
real_messages_control::acknowledgement_control::input_message_listener::FreshInputMessageChunker,
};
mod inbound;
mod outbound;
// TODO: make this configurable
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30);
//const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30);
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60 * 10);
#[derive(Debug)]
pub struct ProxyMessage {
@@ -29,11 +37,11 @@ impl From<(Vec<u8>, bool)> for ProxyMessage {
}
}
pub type MixProxySender<S> = mpsc::UnboundedSender<S>;
pub type MixProxySender<S> = tokio::sync::mpsc::Sender<S>;
// TODO: when we finally get to implementing graceful shutdown,
// on Drop this guy should tell the remote that it's closed now
#[derive(Debug)]
//#[derive(Debug)]
pub struct ProxyRunner<S> {
/// receives data from the mix network and sends that into the socket
mix_receiver: Option<ConnectionReceiver>,
@@ -48,6 +56,26 @@ pub struct ProxyRunner<S> {
// Listens to shutdown commands from higher up
shutdown_listener: ShutdownListener,
msg_chunker: Option<Box<dyn Chunker<S>>>,
}
#[async_trait]
pub trait Chunker<S>: Send {
async fn chunk(&mut self, msg: S);
fn clone_box(&self) -> Box<dyn Chunker<S>>;
}
#[async_trait]
impl Chunker<InputMessage> for FreshInputMessageChunker<OsRng> {
async fn chunk(&mut self, msg: InputMessage) {
self.on_input_message(msg).await;
}
fn clone_box(&self) -> Box<dyn Chunker<InputMessage>> {
Box::new(self.clone())
}
}
impl<S> ProxyRunner<S>
@@ -62,6 +90,7 @@ where
mix_sender: MixProxySender<S>,
connection_id: ConnectionId,
shutdown_listener: ShutdownListener,
msg_chunker: Option<Box<dyn Chunker<S>>>,
) -> Self {
ProxyRunner {
mix_receiver: Some(mix_receiver),
@@ -71,6 +100,7 @@ where
remote_source_address,
connection_id,
shutdown_listener,
msg_chunker,
}
}
@@ -78,10 +108,12 @@ where
// request/response as required by entity running particular side of the proxy.
pub async fn run<F>(mut self, adapter_fn: F) -> Self
where
F: Fn(ConnectionId, Vec<u8>, bool) -> S + Send + 'static,
F: Fn(ConnectionId, Vec<u8>, bool) -> S + Send + Sync + 'static,
{
let (read_half, write_half) = self.socket.take().unwrap().into_split();
let shutdown_notify = Arc::new(Notify::new());
//let chunker = Some(self.msg_chunker.as_ref().unwrap().clone_box());
let chunker = self.msg_chunker.as_ref().map(|c| c.clone_box());
// should run until either inbound closes or is notified from outbound
let inbound_future = inbound::run_inbound(
@@ -93,6 +125,7 @@ where
adapter_fn,
Arc::clone(&shutdown_notify),
self.shutdown_listener.clone(),
chunker,
);
let outbound_future = outbound::run_outbound(
@@ -40,7 +40,7 @@ impl Connection {
pub(crate) async fn run_proxy(
&mut self,
mix_receiver: ConnectionReceiver,
mix_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
mix_sender: tokio::sync::mpsc::Sender<(Socks5Message, Recipient)>,
shutdown: ShutdownListener,
) {
let stream = self.conn.take().unwrap();
@@ -55,6 +55,7 @@ impl Connection {
mix_sender,
connection_id,
shutdown,
None,
)
.run(move |conn_id, read_data, socket_closed| {
(
+22 -13
View File
@@ -66,11 +66,11 @@ impl ServiceProvider {
/// via the `websocket_writer`.
async fn mixnet_response_listener(
mut websocket_writer: SplitSink<TSWebsocketStream, Message>,
mut mix_reader: mpsc::UnboundedReceiver<(Socks5Message, Recipient)>,
mut mix_reader: tokio::sync::mpsc::Receiver<(Socks5Message, Recipient)>,
stats_collector: Option<ServiceStatisticsCollector>,
) {
// TODO: wire SURBs in here once they're available
while let Some((msg, return_address)) = mix_reader.next().await {
while let Some((msg, return_address)) = mix_reader.recv().await {
if let Some(stats_collector) = stats_collector.as_ref() {
if let Some(remote_addr) = stats_collector
.connected_services
@@ -134,7 +134,7 @@ impl ServiceProvider {
remote_addr: String,
return_address: Recipient,
controller_sender: ControllerSender,
mix_input_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
mix_input_sender: tokio::sync::mpsc::Sender<(Socks5Message, Recipient)>,
shutdown: ShutdownListener,
) {
let mut conn = match Connection::new(conn_id, remote_addr.clone(), return_address).await {
@@ -147,12 +147,16 @@ impl ServiceProvider {
);
// inform the remote that the connection is closed before it even was established
mix_input_sender
.unbounded_send((
if mix_input_sender
.send((
Socks5Message::Response(Response::new(conn_id, Vec::new(), true)),
return_address,
))
.unwrap();
.await
.is_err()
{
panic!();
}
return;
}
@@ -188,10 +192,10 @@ impl ServiceProvider {
);
}
fn handle_proxy_connect(
async fn handle_proxy_connect(
&mut self,
controller_sender: &mut ControllerSender,
mix_input_sender: &mpsc::UnboundedSender<(Socks5Message, Recipient)>,
mix_input_sender: &tokio::sync::mpsc::Sender<(Socks5Message, Recipient)>,
conn_id: ConnectionId,
remote_addr: String,
return_address: Recipient,
@@ -200,14 +204,18 @@ impl ServiceProvider {
if !self.open_proxy && !self.outbound_request_filter.check(&remote_addr) {
let log_msg = format!("Domain {:?} failed filter check", remote_addr);
log::info!("{}", log_msg);
mix_input_sender
.unbounded_send((
if mix_input_sender
.send((
Socks5Message::NetworkRequesterResponse(NetworkRequesterResponse::new(
conn_id, log_msg,
)),
return_address,
))
.unwrap();
.await
.is_err()
{
panic!();
}
return;
}
@@ -244,7 +252,7 @@ impl ServiceProvider {
&mut self,
raw_request: &[u8],
controller_sender: &mut ControllerSender,
mix_input_sender: &mpsc::UnboundedSender<(Socks5Message, Recipient)>,
mix_input_sender: &tokio::sync::mpsc::Sender<(Socks5Message, Recipient)>,
stats_collector: Option<ServiceStatisticsCollector>,
shutdown: ShutdownListener,
) {
@@ -273,6 +281,7 @@ impl ServiceProvider {
req.return_address,
shutdown,
)
.await
}
Request::Send(conn_id, data, closed) => {
@@ -307,7 +316,7 @@ impl ServiceProvider {
// channels responsible for managing messages that are to be sent to the mix network. The receiver is
// going to be used by `mixnet_response_listener`
let (mix_input_sender, mix_input_receiver) =
mpsc::unbounded::<(Socks5Message, Recipient)>();
tokio::sync::mpsc::channel::<(Socks5Message, Recipient)>(3);
// Controller for managing all active connections.
// We provide it with a ShutdownListener since it requires it, even though for the network
@@ -77,13 +77,13 @@ pub struct ServiceStatisticsCollector {
pub(crate) response_stats_data: Arc<RwLock<StatsData>>,
pub(crate) connected_services: Arc<RwLock<HashMap<ConnectionId, RemoteAddress>>>,
stats_provider_addr: Recipient,
mix_input_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
mix_input_sender: tokio::sync::mpsc::Sender<(Socks5Message, Recipient)>,
}
impl ServiceStatisticsCollector {
pub async fn new(
stats_provider_addr: Option<Recipient>,
mix_input_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
mix_input_sender: tokio::sync::mpsc::Sender<(Socks5Message, Recipient)>,
) -> Result<Self, StatsError> {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(3))
@@ -175,20 +175,27 @@ impl StatisticsCollector for ServiceStatisticsCollector {
),
self.stats_provider_addr,
);
self.mix_input_sender
.unbounded_send((
if self
.mix_input_sender
.send((
Socks5Message::Request(connect_req),
self.stats_provider_addr,
))
.unwrap();
.await
.is_err()
{
panic!();
}
trace!("Sending data to statistics service");
let mut message_sender = OrderedMessageSender::new();
let ordered_msg = message_sender.wrap_message(msg).into_bytes();
let send_req = Request::new_send(conn_id, ordered_msg, true);
self.mix_input_sender
.unbounded_send((Socks5Message::Request(send_req), self.stats_provider_addr))
.unwrap();
if self.mix_input_sender
.send((Socks5Message::Request(send_req), self.stats_provider_addr))
.await.is_err() {
panic!();
}
Ok(())
}