Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 53f3b0f607 | |||
| a71508fdd7 | |||
| 98763d8a80 | |||
| 7da4d9439d | |||
| 7f0595e295 | |||
| 63d0c74ddd | |||
| 7e6b97fd64 | |||
| 6ba7c74dac | |||
| 3de9895203 | |||
| 006767d62c |
Generated
+20
-10
@@ -131,9 +131,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.53"
|
||||
version = "0.1.58"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ed6aa3524a2dfcf9fe180c51eae2b58738348d819517ceadf95789c51fff7600"
|
||||
checksum = "1e805d94e6b5001b651426cf4cd446b1ab5f319d27bab5c644f61de0a804360c"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -588,6 +588,7 @@ dependencies = [
|
||||
"gateway-requests",
|
||||
"gloo-timers",
|
||||
"humantime-serde",
|
||||
"itertools",
|
||||
"log",
|
||||
"nonexhaustive-delayqueue",
|
||||
"nymsphinx",
|
||||
@@ -2530,9 +2531,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "itertools"
|
||||
version = "0.10.3"
|
||||
version = "0.10.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a9a9d19fa1e79b6215ff29b9d6880b706147f16e9b1dbb1e4e5947b5b02bc5e3"
|
||||
checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473"
|
||||
dependencies = [
|
||||
"either",
|
||||
]
|
||||
@@ -4012,11 +4013,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro2"
|
||||
version = "1.0.37"
|
||||
version = "1.0.47"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ec757218438d5fda206afc041538b2f6d889286160d649a86a24d37e1235afd1"
|
||||
checksum = "5ea3d908b0e36316caf9e9e2c4625cdde190a7e6f440d794667ed17a1855e725"
|
||||
dependencies = [
|
||||
"unicode-xid",
|
||||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4122,10 +4123,13 @@ dependencies = [
|
||||
name = "proxy-helpers"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"client-core",
|
||||
"futures",
|
||||
"log",
|
||||
"ordered-buffer",
|
||||
"rand 0.7.3",
|
||||
"socks5-requests",
|
||||
"task",
|
||||
"tokio",
|
||||
@@ -5582,13 +5586,13 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "1.0.91"
|
||||
version = "1.0.103"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b683b2b825c8eef438b77c36a06dc262294da3d5a5813fac20da149241dcd44d"
|
||||
checksum = "a864042229133ada95abf3b54fdc62ef5ccabe9515b64717bcb9a1919e59445d"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"unicode-xid",
|
||||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -6310,6 +6314,12 @@ version = "0.3.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1a01404663e3db436ed2746d9fefef640d868edae3cceb81c3b8d5732fda678f"
|
||||
|
||||
[[package]]
|
||||
name = "unicode-ident"
|
||||
version = "1.0.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3"
|
||||
|
||||
[[package]]
|
||||
name = "unicode-normalization"
|
||||
version = "0.1.9"
|
||||
|
||||
@@ -31,6 +31,7 @@ validator-client = { path = "../../common/client-libs/validator-client", default
|
||||
tap = "1.0.1"
|
||||
|
||||
tokio = { version = "1.21.2", features = ["time", "macros"]}
|
||||
itertools = "0.10.5"
|
||||
|
||||
[target."cfg(target_arch = \"wasm32\")".dependencies.wasm-bindgen-futures]
|
||||
version = "0.4"
|
||||
@@ -56,4 +57,4 @@ tempfile = "3.1.0"
|
||||
default = ["reply-surb"]
|
||||
wasm = ["gateway-client/wasm"]
|
||||
coconut = ["gateway-client/coconut", "gateway-requests/coconut"]
|
||||
reply-surb = ["sled"]
|
||||
reply-surb = ["sled"]
|
||||
|
||||
@@ -2,8 +2,8 @@ use futures::channel::mpsc;
|
||||
use nymsphinx::addressing::clients::Recipient;
|
||||
use nymsphinx::anonymous_replies::ReplySurb;
|
||||
|
||||
pub type InputMessageSender = mpsc::UnboundedSender<InputMessage>;
|
||||
pub type InputMessageReceiver = mpsc::UnboundedReceiver<InputMessage>;
|
||||
pub type InputMessageSender = tokio::sync::mpsc::Sender<InputMessage>;
|
||||
pub type InputMessageReceiver = tokio::sync::mpsc::Receiver<InputMessage>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum InputMessage {
|
||||
|
||||
+1
-1
@@ -33,7 +33,7 @@ impl AcknowledgementListener {
|
||||
}
|
||||
|
||||
async fn on_ack(&mut self, ack_content: Vec<u8>) {
|
||||
debug!("Received an ack");
|
||||
trace!("Received an ack");
|
||||
let frag_id = match recover_identifier(&self.ack_key, &ack_content)
|
||||
.map(FragmentIdentifier::try_from_bytes)
|
||||
{
|
||||
|
||||
+3
-3
@@ -13,7 +13,7 @@ use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
pub(crate) type ActionSender = UnboundedSender<Action>;
|
||||
pub type ActionSender = UnboundedSender<Action>;
|
||||
|
||||
// The actual data being sent off as well as potential key to the delay queue
|
||||
type PendingAckEntry = (Arc<PendingAcknowledgement>, Option<QueueKey>);
|
||||
@@ -23,7 +23,7 @@ type PendingAckEntry = (Arc<PendingAcknowledgement>, Option<QueueKey>);
|
||||
// - received an ack so we want to remove an entry
|
||||
// - start a retransmission timer for sending the packet into the network (on either first try or retransmission)
|
||||
// - update the internal sphinx delay of an expired packet
|
||||
pub(crate) enum Action {
|
||||
pub enum Action {
|
||||
/// Inserts new `PendingAcknowledgement`s into the 'shared' state.
|
||||
/// Initiated by `InputMessageListener`
|
||||
InsertPending(Vec<PendingAcknowledgement>),
|
||||
@@ -44,7 +44,7 @@ pub(crate) enum Action {
|
||||
}
|
||||
|
||||
impl Action {
|
||||
pub(crate) fn new_insert(pending_acks: Vec<PendingAcknowledgement>) -> Self {
|
||||
pub fn new_insert(pending_acks: Vec<PendingAcknowledgement>) -> Self {
|
||||
Action::InsertPending(pending_acks)
|
||||
}
|
||||
|
||||
|
||||
+200
-4
@@ -15,6 +15,7 @@ use nymsphinx::preparer::MessagePreparer;
|
||||
use nymsphinx::{acknowledgements::AckKey, addressing::clients::Recipient};
|
||||
use rand::{CryptoRng, Rng};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
#[cfg(feature = "reply-surb")]
|
||||
use crate::client::reply_key_storage::ReplyKeyStorage;
|
||||
@@ -41,6 +42,10 @@ impl<R> InputMessageListener<R>
|
||||
where
|
||||
R: CryptoRng + Rng,
|
||||
{
|
||||
pub fn get_action_sender(&self) -> ActionSender {
|
||||
self.action_sender.clone()
|
||||
}
|
||||
|
||||
// at this point I'm not entirely sure how to deal with this warning without
|
||||
// some considerable refactoring
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
@@ -182,9 +187,9 @@ where
|
||||
// there's no point in trying to send nothing
|
||||
if let Some(real_messages) = real_messages {
|
||||
// tells real message sender (with the poisson timer) to send this to the mix network
|
||||
self.real_message_sender
|
||||
.unbounded_send(real_messages)
|
||||
.unwrap();
|
||||
if self.real_message_sender.send(real_messages).await.is_err() {
|
||||
panic!();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -194,7 +199,7 @@ where
|
||||
|
||||
while !shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
input_msg = self.input_receiver.next() => match input_msg {
|
||||
input_msg = self.input_receiver.recv() => match input_msg {
|
||||
Some(input_msg) => {
|
||||
self.on_input_message(input_msg).await;
|
||||
},
|
||||
@@ -220,3 +225,194 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct FreshInputMessageChunker<R>
|
||||
where
|
||||
R: CryptoRng + Rng,
|
||||
{
|
||||
ack_key: Arc<AckKey>,
|
||||
ack_recipient: Recipient,
|
||||
message_preparer: MessagePreparer<R>,
|
||||
action_sender: ActionSender,
|
||||
real_message_sender: BatchRealMessageSender,
|
||||
topology_access: TopologyAccessor,
|
||||
}
|
||||
|
||||
impl<R> FreshInputMessageChunker<R>
|
||||
where
|
||||
R: CryptoRng + Rng,
|
||||
{
|
||||
pub fn new(
|
||||
ack_key: Arc<AckKey>,
|
||||
ack_recipient: Recipient,
|
||||
message_preparer: MessagePreparer<R>,
|
||||
action_sender: ActionSender,
|
||||
real_message_sender: BatchRealMessageSender,
|
||||
topology_access: TopologyAccessor,
|
||||
) -> Self {
|
||||
Self {
|
||||
ack_key,
|
||||
ack_recipient,
|
||||
message_preparer,
|
||||
action_sender,
|
||||
real_message_sender,
|
||||
topology_access,
|
||||
}
|
||||
}
|
||||
|
||||
// we require topology for replies to generate surb_acks
|
||||
//async fn handle_reply(&mut self, reply_surb: ReplySurb, data: Vec<u8>) -> Option<RealMessage> {
|
||||
// let topology_permit = self.topology_access.get_read_permit().await;
|
||||
// let topology = match topology_permit.try_get_valid_topology_ref(&self.ack_recipient, None) {
|
||||
// Some(topology_ref) => topology_ref,
|
||||
// None => {
|
||||
// warn!("Could not process the message - the network topology is invalid");
|
||||
// return None;
|
||||
// }
|
||||
// };
|
||||
|
||||
// match self
|
||||
// .message_preparer
|
||||
// .prepare_reply_for_use(data, reply_surb, topology, &self.ack_key)
|
||||
// .await
|
||||
// {
|
||||
// Ok((mix_packet, reply_id)) => {
|
||||
// // TODO: later probably write pending ack here
|
||||
// // and deal with them....
|
||||
// // ... somehow
|
||||
// Some(RealMessage::new(mix_packet, reply_id))
|
||||
// }
|
||||
// Err(err) => {
|
||||
// // TODO: should we have some mechanism to indicate to the user that the `reply_surb`
|
||||
// // could be reused since technically it wasn't used up here?
|
||||
// warn!("failed to deal with received reply surb - {:?}", err);
|
||||
// None
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
|
||||
async fn handle_fresh_message(
|
||||
&mut self,
|
||||
recipient: Recipient,
|
||||
content: Vec<u8>,
|
||||
with_reply_surb: bool,
|
||||
) -> Option<Vec<RealMessage>> {
|
||||
let topology_permit = self.topology_access.get_read_permit().await;
|
||||
let topology = match topology_permit
|
||||
.try_get_valid_topology_ref(&self.ack_recipient, Some(&recipient))
|
||||
{
|
||||
Some(topology_ref) => topology_ref,
|
||||
None => {
|
||||
warn!("Could not process the message - the network topology is invalid");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
// split the message, attach optional reply surb
|
||||
let (split_message, reply_key) = self
|
||||
.message_preparer
|
||||
.prepare_and_split_message(content, with_reply_surb, topology)
|
||||
.expect("somehow the topology was invalid after all!");
|
||||
|
||||
//#[cfg(feature = "reply-surb")]
|
||||
//if let Some(reply_key) = reply_key {
|
||||
// self.reply_key_storage
|
||||
// .insert_encryption_key(reply_key)
|
||||
// .expect("Failed to insert surb reply key to the store!")
|
||||
//}
|
||||
|
||||
#[cfg(not(feature = "reply-surb"))]
|
||||
let _reply_key = reply_key;
|
||||
|
||||
// encrypt chunks, put them inside sphinx packets and generate acks
|
||||
let mut pending_acks = Vec::with_capacity(split_message.len());
|
||||
let mut real_messages = Vec::with_capacity(split_message.len());
|
||||
for message_chunk in split_message {
|
||||
// we need to clone it because we need to keep it in memory in case we had to retransmit
|
||||
// it. And then we'd need to recreate entire ACK again.
|
||||
let chunk_clone = message_chunk.clone();
|
||||
let prepared_fragment = self
|
||||
.message_preparer
|
||||
.prepare_chunk_for_sending(chunk_clone, topology, &self.ack_key, &recipient)
|
||||
.unwrap();
|
||||
|
||||
real_messages.push(RealMessage::new(
|
||||
prepared_fragment.mix_packet,
|
||||
message_chunk.fragment_identifier(),
|
||||
));
|
||||
|
||||
pending_acks.push(PendingAcknowledgement::new(
|
||||
message_chunk,
|
||||
prepared_fragment.total_delay,
|
||||
recipient,
|
||||
));
|
||||
}
|
||||
|
||||
// tells the controller to put this into the hashmap
|
||||
self.action_sender
|
||||
.unbounded_send(Action::new_insert(pending_acks))
|
||||
.unwrap();
|
||||
|
||||
Some(real_messages)
|
||||
}
|
||||
|
||||
pub async fn on_input_message(&mut self, msg: InputMessage) {
|
||||
log::info!("Using new chunker!");
|
||||
let real_messages = match msg {
|
||||
InputMessage::Fresh {
|
||||
recipient,
|
||||
data,
|
||||
with_reply_surb,
|
||||
} => {
|
||||
self.handle_fresh_message(recipient, data, with_reply_surb)
|
||||
.await
|
||||
}
|
||||
InputMessage::Reply { .. } => panic!(),
|
||||
//InputMessage::Reply { reply_surb, data } => self
|
||||
// .handle_reply(reply_surb, data)
|
||||
// .await
|
||||
// .map(|message| vec![message]),
|
||||
};
|
||||
|
||||
// there's no point in trying to send nothing
|
||||
if let Some(real_messages) = real_messages {
|
||||
// tells real message sender (with the poisson timer) to send this to the mix network
|
||||
|
||||
use itertools::Itertools;
|
||||
|
||||
log::info!("chunked into {} messages", real_messages.len());
|
||||
log::info!("current capacity: {}", self.real_message_sender.capacity());
|
||||
if real_messages.len() > 10 {
|
||||
// only send if there is nothing in queue
|
||||
log::info!("sending large message(s)");
|
||||
//let grouped: Vec<Vec<RealMessage>> = real_messages
|
||||
// .into_iter()
|
||||
// .chunks(10)
|
||||
// .into_iter()
|
||||
// .map(|c| c.collect())
|
||||
// .collect();
|
||||
|
||||
for msg in real_messages {
|
||||
let msgs = vec![msg];
|
||||
//let msgs = msg.to_vec();
|
||||
loop {
|
||||
if self.real_message_sender.capacity() > 2 {
|
||||
if self.real_message_sender.send(msgs).await.is_err() {
|
||||
panic!();
|
||||
}
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
}
|
||||
}
|
||||
log::error!("finished sending large message(s)");
|
||||
} else {
|
||||
log::info!("sending small message(s)");
|
||||
if self.real_message_sender.send(real_messages).await.is_err() {
|
||||
panic!();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+30
-9
@@ -1,6 +1,8 @@
|
||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use self::action_controller::ActionSender;
|
||||
use self::input_message_listener::FreshInputMessageChunker;
|
||||
use self::{
|
||||
acknowledgement_listener::AcknowledgementListener, action_controller::ActionController,
|
||||
input_message_listener::InputMessageListener,
|
||||
@@ -31,8 +33,8 @@ use std::{
|
||||
use crate::client::reply_key_storage::ReplyKeyStorage;
|
||||
|
||||
mod acknowledgement_listener;
|
||||
mod action_controller;
|
||||
mod input_message_listener;
|
||||
pub mod action_controller;
|
||||
pub mod input_message_listener;
|
||||
mod retransmission_request_listener;
|
||||
mod sent_notification_listener;
|
||||
|
||||
@@ -52,7 +54,7 @@ type SentPacketNotificationReceiver = mpsc::UnboundedReceiver<FragmentIdentifier
|
||||
|
||||
/// Structure representing a data `Fragment` that is on-route to the specified `Recipient`
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct PendingAcknowledgement {
|
||||
pub struct PendingAcknowledgement {
|
||||
message_chunk: Fragment,
|
||||
delay: SphinxDelay,
|
||||
recipient: Recipient,
|
||||
@@ -60,7 +62,7 @@ pub(crate) struct PendingAcknowledgement {
|
||||
|
||||
impl PendingAcknowledgement {
|
||||
/// Creates new instance of `PendingAcknowledgement` using the provided data.
|
||||
fn new(message_chunk: Fragment, delay: SphinxDelay, recipient: Recipient) -> Self {
|
||||
pub fn new(message_chunk: Fragment, delay: SphinxDelay, recipient: Recipient) -> Self {
|
||||
PendingAcknowledgement {
|
||||
message_chunk,
|
||||
delay,
|
||||
@@ -110,7 +112,7 @@ impl AcknowledgementControllerConnectors {
|
||||
}
|
||||
|
||||
/// Configurable parameters of the `AcknowledgementController`
|
||||
pub(super) struct Config {
|
||||
pub struct Config {
|
||||
/// Given ack timeout in the form a * BASE_DELAY + b, it specifies the additive part `b`
|
||||
ack_wait_addition: Duration,
|
||||
|
||||
@@ -118,17 +120,17 @@ pub(super) struct Config {
|
||||
ack_wait_multiplier: f64,
|
||||
|
||||
/// Average delay an acknowledgement packet is going to get delayed at a single mixnode.
|
||||
average_ack_delay: Duration,
|
||||
pub average_ack_delay: Duration,
|
||||
|
||||
/// Average delay a data packet is going to get delayed at a single mixnode.
|
||||
average_packet_delay: Duration,
|
||||
pub average_packet_delay: Duration,
|
||||
|
||||
/// Predefined packet size used for the encapsulated messages.
|
||||
packet_size: PacketSize,
|
||||
pub packet_size: PacketSize,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
pub(super) fn new(
|
||||
pub fn new(
|
||||
ack_wait_addition: Duration,
|
||||
ack_wait_multiplier: f64,
|
||||
average_ack_delay: Duration,
|
||||
@@ -155,6 +157,7 @@ where
|
||||
{
|
||||
acknowledgement_listener: AcknowledgementListener,
|
||||
input_message_listener: InputMessageListener<R>,
|
||||
pub fresh_input_msg_chunker: FreshInputMessageChunker<R>,
|
||||
retransmission_request_listener: RetransmissionRequestListener<R>,
|
||||
sent_notification_listener: SentNotificationListener,
|
||||
action_controller: ActionController,
|
||||
@@ -164,6 +167,14 @@ impl<R> AcknowledgementController<R>
|
||||
where
|
||||
R: 'static + CryptoRng + Rng + Clone + Send,
|
||||
{
|
||||
pub fn get_action_sender(&self) -> ActionSender {
|
||||
self.input_message_listener.get_action_sender()
|
||||
}
|
||||
|
||||
//pub fn get_real_message_sender(&self) -> BatchRealMessageSender {
|
||||
//self.input_message_listener.get_real_message_sender()
|
||||
//}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(super) fn new(
|
||||
config: Config,
|
||||
@@ -209,6 +220,15 @@ where
|
||||
reply_key_storage,
|
||||
);
|
||||
|
||||
let fresh_input_msg_chunker = FreshInputMessageChunker::new(
|
||||
Arc::clone(&ack_key),
|
||||
ack_recipient,
|
||||
message_preparer.clone(),
|
||||
action_sender.clone(),
|
||||
connectors.real_message_sender.clone(),
|
||||
topology_access.clone(),
|
||||
);
|
||||
|
||||
// will listen for any ack timeouts and trigger retransmission
|
||||
let retransmission_request_listener = RetransmissionRequestListener::new(
|
||||
Arc::clone(&ack_key),
|
||||
@@ -228,6 +248,7 @@ where
|
||||
AcknowledgementController {
|
||||
acknowledgement_listener,
|
||||
input_message_listener,
|
||||
fresh_input_msg_chunker,
|
||||
retransmission_request_listener,
|
||||
sent_notification_listener,
|
||||
action_controller,
|
||||
|
||||
+8
-3
@@ -112,12 +112,17 @@ where
|
||||
.unwrap();
|
||||
|
||||
// send to `OutQueueControl` to eventually send to the mix network
|
||||
self.real_message_sender
|
||||
.unbounded_send(vec![RealMessage::new(
|
||||
if self
|
||||
.real_message_sender
|
||||
.send(vec![RealMessage::new(
|
||||
prepared_fragment.mix_packet,
|
||||
frag_id,
|
||||
)])
|
||||
.unwrap();
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
panic!();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
|
||||
@@ -5,6 +5,8 @@
|
||||
// INPUT2: Acks from mix
|
||||
// OUTPUT: MixMessage to mix traffic
|
||||
|
||||
use self::acknowledgement_control::action_controller::ActionSender;
|
||||
use self::acknowledgement_control::input_message_listener::FreshInputMessageChunker;
|
||||
use self::{
|
||||
acknowledgement_control::AcknowledgementController, real_traffic_stream::OutQueueControl,
|
||||
};
|
||||
@@ -27,8 +29,8 @@ use std::time::Duration;
|
||||
#[cfg(feature = "reply-surb")]
|
||||
use crate::client::reply_key_storage::ReplyKeyStorage;
|
||||
|
||||
mod acknowledgement_control;
|
||||
mod real_traffic_stream;
|
||||
pub mod acknowledgement_control;
|
||||
pub mod real_traffic_stream;
|
||||
|
||||
// TODO: ack_key and self_recipient shouldn't really be part of this config
|
||||
pub struct Config {
|
||||
@@ -36,10 +38,10 @@ pub struct Config {
|
||||
ack_key: Arc<AckKey>,
|
||||
|
||||
/// Given ack timeout in the form a * BASE_DELAY + b, it specifies the additive part `b`
|
||||
ack_wait_addition: Duration,
|
||||
pub ack_wait_addition: Duration,
|
||||
|
||||
/// Given ack timeout in the form a * BASE_DELAY + b, it specifies the multiplier `a`
|
||||
ack_wait_multiplier: f64,
|
||||
pub ack_wait_multiplier: f64,
|
||||
|
||||
/// Address of `this` client.
|
||||
self_recipient: Recipient,
|
||||
@@ -48,17 +50,17 @@ pub struct Config {
|
||||
average_message_sending_delay: Duration,
|
||||
|
||||
/// Average delay a data packet is going to get delayed at a single mixnode.
|
||||
average_packet_delay_duration: Duration,
|
||||
pub average_packet_delay_duration: Duration,
|
||||
|
||||
/// Average delay an acknowledgement packet is going to get delayed at a single mixnode.
|
||||
average_ack_delay_duration: Duration,
|
||||
pub average_ack_delay_duration: Duration,
|
||||
|
||||
/// Controls whether the main packet stream constantly produces packets according to the predefined
|
||||
/// poisson distribution.
|
||||
disable_main_poisson_packet_distribution: bool,
|
||||
|
||||
/// Predefined packet size used for the encapsulated messages.
|
||||
packet_size: PacketSize,
|
||||
pub packet_size: PacketSize,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
@@ -103,6 +105,14 @@ where
|
||||
// obviously when we finally make shared rng that is on 'higher' level, this should become
|
||||
// generic `R`
|
||||
impl RealMessagesController<OsRng> {
|
||||
pub fn get_action_sender(&self) -> ActionSender {
|
||||
self.ack_control.get_action_sender()
|
||||
}
|
||||
|
||||
pub fn get_fresh_input_message_chunker(&self) -> FreshInputMessageChunker<OsRng> {
|
||||
self.ack_control.fresh_input_msg_chunker.clone()
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
config: Config,
|
||||
ack_receiver: AcknowledgementReceiver,
|
||||
@@ -113,7 +123,7 @@ impl RealMessagesController<OsRng> {
|
||||
) -> Self {
|
||||
let rng = OsRng;
|
||||
|
||||
let (real_message_sender, real_message_receiver) = mpsc::unbounded();
|
||||
let (real_message_sender, real_message_receiver) = tokio::sync::mpsc::channel(3);
|
||||
let (sent_notifier_tx, sent_notifier_rx) = mpsc::unbounded();
|
||||
|
||||
let ack_controller_connectors = AcknowledgementControllerConnectors::new(
|
||||
|
||||
@@ -226,13 +226,13 @@ where
|
||||
received_buffer: VecDeque<RealMessage>,
|
||||
}
|
||||
|
||||
pub(crate) struct RealMessage {
|
||||
pub struct RealMessage {
|
||||
mix_packet: MixPacket,
|
||||
fragment_id: FragmentIdentifier,
|
||||
}
|
||||
|
||||
impl RealMessage {
|
||||
pub(crate) fn new(mix_packet: MixPacket, fragment_id: FragmentIdentifier) -> Self {
|
||||
pub fn new(mix_packet: MixPacket, fragment_id: FragmentIdentifier) -> Self {
|
||||
RealMessage {
|
||||
mix_packet,
|
||||
fragment_id,
|
||||
@@ -242,8 +242,8 @@ impl RealMessage {
|
||||
|
||||
// messages are already prepared, etc. the real point of it is to forward it to mix_traffic
|
||||
// after sufficient delay
|
||||
pub(crate) type BatchRealMessageSender = mpsc::UnboundedSender<Vec<RealMessage>>;
|
||||
type BatchRealMessageReceiver = mpsc::UnboundedReceiver<Vec<RealMessage>>;
|
||||
pub type BatchRealMessageSender = tokio::sync::mpsc::Sender<Vec<RealMessage>>;
|
||||
type BatchRealMessageReceiver = tokio::sync::mpsc::Receiver<Vec<RealMessage>>;
|
||||
|
||||
pub(crate) enum StreamMessage {
|
||||
Cover,
|
||||
@@ -425,7 +425,7 @@ where
|
||||
}
|
||||
|
||||
// decide what kind of message to send
|
||||
match Pin::new(&mut self.real_receiver).poll_next(cx) {
|
||||
match Pin::new(&mut self.real_receiver).poll_recv(cx) {
|
||||
// in the case our real message channel stream was closed, we should also indicate we are closed
|
||||
// (and whoever is using the stream should panic)
|
||||
Poll::Ready(None) => Poll::Ready(None),
|
||||
@@ -472,7 +472,7 @@ where
|
||||
return Poll::Ready(Some(StreamMessage::Real(Box::new(real_available))));
|
||||
}
|
||||
|
||||
match Pin::new(&mut self.real_receiver).poll_next(cx) {
|
||||
match Pin::new(&mut self.real_receiver).poll_recv(cx) {
|
||||
// in the case our real message channel stream was closed, we should also indicate we are closed
|
||||
// (and whoever is using the stream should panic)
|
||||
Poll::Ready(None) => Poll::Ready(None),
|
||||
|
||||
@@ -208,7 +208,7 @@ impl ReceivedMessagesBuffer {
|
||||
}
|
||||
|
||||
async fn handle_new_received(&mut self, msgs: Vec<Vec<u8>>) {
|
||||
debug!(
|
||||
trace!(
|
||||
"Processing {:?} new message that might get added to the buffer!",
|
||||
msgs.len()
|
||||
);
|
||||
|
||||
@@ -50,7 +50,7 @@ impl<'a> Deref for TopologyReadPermit<'a> {
|
||||
impl<'a> TopologyReadPermit<'a> {
|
||||
/// Using provided topology read permit, tries to get an immutable reference to the underlying
|
||||
/// topology. For obvious reasons the lifetime of the topology reference is bound to the permit.
|
||||
pub(super) fn try_get_valid_topology_ref(
|
||||
pub fn try_get_valid_topology_ref(
|
||||
&'a self,
|
||||
ack_recipient: &Recipient,
|
||||
packet_recipient: Option<&Recipient>,
|
||||
|
||||
@@ -8,6 +8,7 @@ use client_core::client::inbound_messages::{
|
||||
use client_core::client::key_manager::KeyManager;
|
||||
use client_core::client::mix_traffic::{BatchMixMessageSender, MixTrafficController};
|
||||
use client_core::client::real_messages_control;
|
||||
use client_core::client::real_messages_control::acknowledgement_control::input_message_listener::FreshInputMessageChunker;
|
||||
use client_core::client::real_messages_control::RealMessagesController;
|
||||
use client_core::client::received_buffer::{
|
||||
ReceivedBufferMessage, ReceivedBufferRequestReceiver, ReceivedBufferRequestSender,
|
||||
@@ -31,6 +32,7 @@ use nymsphinx::addressing::clients::Recipient;
|
||||
use nymsphinx::addressing::nodes::NodeIdentity;
|
||||
use nymsphinx::anonymous_replies::ReplySurb;
|
||||
use nymsphinx::receiver::ReconstructedMessage;
|
||||
use rand::rngs::OsRng;
|
||||
use task::{wait_for_signal, ShutdownListener, ShutdownNotifier};
|
||||
|
||||
use crate::client::config::{Config, SocketType};
|
||||
@@ -118,7 +120,7 @@ impl NymClient {
|
||||
input_receiver: InputMessageReceiver,
|
||||
mix_sender: BatchMixMessageSender,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
) -> FreshInputMessageChunker<OsRng> {
|
||||
let mut controller_config = real_messages_control::Config::new(
|
||||
self.key_manager.ack_key(),
|
||||
self.config.get_base().get_ack_wait_multiplier(),
|
||||
@@ -139,15 +141,20 @@ impl NymClient {
|
||||
|
||||
info!("Starting real traffic stream...");
|
||||
|
||||
RealMessagesController::new(
|
||||
let real_message_controller = RealMessagesController::new(
|
||||
controller_config,
|
||||
ack_receiver,
|
||||
input_receiver,
|
||||
mix_sender,
|
||||
topology_accessor,
|
||||
reply_key_storage,
|
||||
)
|
||||
.start_with_shutdown(shutdown);
|
||||
);
|
||||
|
||||
let fresh_input_msg_chunker = real_message_controller.get_fresh_input_message_chunker();
|
||||
|
||||
real_message_controller.start_with_shutdown(shutdown);
|
||||
|
||||
fresh_input_msg_chunker
|
||||
}
|
||||
|
||||
// buffer controlling all messages fetched from provider
|
||||
@@ -279,11 +286,16 @@ impl NymClient {
|
||||
&self,
|
||||
buffer_requester: ReceivedBufferRequestSender,
|
||||
msg_input: InputMessageSender,
|
||||
fresh_input_msg_chunker: FreshInputMessageChunker<OsRng>,
|
||||
) {
|
||||
info!("Starting websocket listener...");
|
||||
|
||||
let websocket_handler =
|
||||
websocket::Handler::new(msg_input, buffer_requester, self.as_mix_recipient());
|
||||
let websocket_handler = websocket::Handler::new(
|
||||
msg_input,
|
||||
buffer_requester,
|
||||
self.as_mix_recipient(),
|
||||
fresh_input_msg_chunker,
|
||||
);
|
||||
|
||||
websocket::Listener::new(self.config.get_listening_port()).start(websocket_handler);
|
||||
}
|
||||
@@ -291,27 +303,42 @@ impl NymClient {
|
||||
/// EXPERIMENTAL DIRECT RUST API
|
||||
/// It's untested and there are absolutely no guarantees about it (but seems to have worked
|
||||
/// well enough in local tests)
|
||||
pub fn send_message(&mut self, recipient: Recipient, message: Vec<u8>, with_reply_surb: bool) {
|
||||
pub async fn send_message(
|
||||
&mut self,
|
||||
recipient: Recipient,
|
||||
message: Vec<u8>,
|
||||
with_reply_surb: bool,
|
||||
) {
|
||||
let input_msg = InputMessage::new_fresh(recipient, message, with_reply_surb);
|
||||
|
||||
self.input_tx
|
||||
if self
|
||||
.input_tx
|
||||
.as_ref()
|
||||
.expect("start method was not called before!")
|
||||
.unbounded_send(input_msg)
|
||||
.unwrap();
|
||||
.send(input_msg)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
panic!();
|
||||
}
|
||||
}
|
||||
|
||||
/// EXPERIMENTAL DIRECT RUST API
|
||||
/// It's untested and there are absolutely no guarantees about it (but seems to have worked
|
||||
/// well enough in local tests)
|
||||
pub fn send_reply(&mut self, reply_surb: ReplySurb, message: Vec<u8>) {
|
||||
pub async fn send_reply(&mut self, reply_surb: ReplySurb, message: Vec<u8>) {
|
||||
let input_msg = InputMessage::new_reply(reply_surb, message);
|
||||
|
||||
self.input_tx
|
||||
if self
|
||||
.input_tx
|
||||
.as_ref()
|
||||
.expect("start method was not called before!")
|
||||
.unbounded_send(input_msg)
|
||||
.unwrap();
|
||||
.send(input_msg)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
panic!();
|
||||
}
|
||||
}
|
||||
|
||||
/// EXPERIMENTAL DIRECT RUST API
|
||||
@@ -368,7 +395,7 @@ impl NymClient {
|
||||
let (received_buffer_request_sender, received_buffer_request_receiver) = mpsc::unbounded();
|
||||
|
||||
// channels responsible for controlling real messages
|
||||
let (input_sender, input_receiver) = mpsc::unbounded::<InputMessage>();
|
||||
let (input_sender, input_receiver) = tokio::sync::mpsc::channel::<InputMessage>(3);
|
||||
|
||||
// channels responsible for controlling ack messages
|
||||
let (ack_sender, ack_receiver) = mpsc::unbounded();
|
||||
@@ -403,7 +430,7 @@ impl NymClient {
|
||||
let sphinx_message_sender =
|
||||
Self::start_mix_traffic_controller(gateway_client, shutdown.subscribe());
|
||||
|
||||
self.start_real_traffic_controller(
|
||||
let fresh_input_msg_chunker = self.start_real_traffic_controller(
|
||||
shared_topology_accessor.clone(),
|
||||
reply_key_storage,
|
||||
ack_receiver,
|
||||
@@ -425,9 +452,11 @@ impl NymClient {
|
||||
}
|
||||
|
||||
match self.config.get_socket_type() {
|
||||
SocketType::WebSocket => {
|
||||
self.start_websocket_listener(received_buffer_request_sender, input_sender)
|
||||
}
|
||||
SocketType::WebSocket => self.start_websocket_listener(
|
||||
received_buffer_request_sender,
|
||||
input_sender,
|
||||
fresh_input_msg_chunker,
|
||||
),
|
||||
SocketType::None => {
|
||||
// if we did not start the socket, it means we're running (supposedly) in the native mode
|
||||
// and hence we should announce 'ourselves' to the buffer
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
use client_core::client::{
|
||||
inbound_messages::{InputMessage, InputMessageSender},
|
||||
real_messages_control::acknowledgement_control::input_message_listener::FreshInputMessageChunker,
|
||||
received_buffer::{
|
||||
ReceivedBufferMessage, ReceivedBufferRequestSender, ReconstructedMessagesReceiver,
|
||||
},
|
||||
@@ -13,6 +14,7 @@ use log::*;
|
||||
use nymsphinx::addressing::clients::Recipient;
|
||||
use nymsphinx::anonymous_replies::ReplySurb;
|
||||
use nymsphinx::receiver::ReconstructedMessage;
|
||||
use rand::rngs::OsRng;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_tungstenite::{
|
||||
accept_async,
|
||||
@@ -38,6 +40,7 @@ pub(crate) struct Handler {
|
||||
self_full_address: Recipient,
|
||||
socket: Option<WebSocketStream<TcpStream>>,
|
||||
received_response_type: ReceivedResponseType,
|
||||
fresh_input_msg_chunker: FreshInputMessageChunker<OsRng>,
|
||||
}
|
||||
|
||||
// clone is used to use handler on a new connection, which initially is `None`
|
||||
@@ -49,6 +52,7 @@ impl Clone for Handler {
|
||||
self_full_address: self.self_full_address,
|
||||
socket: None,
|
||||
received_response_type: Default::default(),
|
||||
fresh_input_msg_chunker: self.fresh_input_msg_chunker.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -66,6 +70,7 @@ impl Handler {
|
||||
msg_input: InputMessageSender,
|
||||
buffer_requester: ReceivedBufferRequestSender,
|
||||
self_full_address: Recipient,
|
||||
fresh_input_msg_chunker: FreshInputMessageChunker<OsRng>,
|
||||
) -> Self {
|
||||
Handler {
|
||||
msg_input,
|
||||
@@ -73,10 +78,11 @@ impl Handler {
|
||||
self_full_address,
|
||||
socket: None,
|
||||
received_response_type: Default::default(),
|
||||
fresh_input_msg_chunker,
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_send(
|
||||
async fn handle_send(
|
||||
&mut self,
|
||||
recipient: Recipient,
|
||||
message: Vec<u8>,
|
||||
@@ -84,18 +90,26 @@ impl Handler {
|
||||
) -> Option<ServerResponse> {
|
||||
// the ack control is now responsible for chunking, etc.
|
||||
let input_msg = InputMessage::new_fresh(recipient, message, with_reply_surb);
|
||||
self.msg_input.unbounded_send(input_msg).unwrap();
|
||||
|
||||
// WIP(JON): here we should chunk the message, and send it to the sphinx_message_sender
|
||||
self.fresh_input_msg_chunker
|
||||
.on_input_message(input_msg)
|
||||
.await;
|
||||
|
||||
//self.msg_input.unbounded_send(input_msg).unwrap();
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
fn handle_reply(&mut self, reply_surb: ReplySurb, message: Vec<u8>) -> Option<ServerResponse> {
|
||||
async fn handle_reply(&mut self, reply_surb: ReplySurb, message: Vec<u8>) -> Option<ServerResponse> {
|
||||
if message.len() > ReplySurb::max_msg_len(Default::default()) {
|
||||
return Some(ServerResponse::new_error(format!("too long message to put inside a reply SURB. Received: {} bytes and maximum is {} bytes", message.len(), ReplySurb::max_msg_len(Default::default()))));
|
||||
}
|
||||
|
||||
let input_msg = InputMessage::new_reply(reply_surb, message);
|
||||
self.msg_input.unbounded_send(input_msg).unwrap();
|
||||
if self.msg_input.send(input_msg).await.is_err() {
|
||||
panic!();
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
@@ -104,22 +118,22 @@ impl Handler {
|
||||
ServerResponse::SelfAddress(self.self_full_address)
|
||||
}
|
||||
|
||||
fn handle_request(&mut self, request: ClientRequest) -> Option<ServerResponse> {
|
||||
async fn handle_request(&mut self, request: ClientRequest) -> Option<ServerResponse> {
|
||||
match request {
|
||||
ClientRequest::Send {
|
||||
recipient,
|
||||
message,
|
||||
with_reply_surb,
|
||||
} => self.handle_send(recipient, message, with_reply_surb),
|
||||
} => self.handle_send(recipient, message, with_reply_surb).await,
|
||||
ClientRequest::Reply {
|
||||
message,
|
||||
reply_surb,
|
||||
} => self.handle_reply(reply_surb, message),
|
||||
} => self.handle_reply(reply_surb, message).await,
|
||||
ClientRequest::SelfAddress => Some(self.handle_self_address()),
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_text_message(&mut self, msg: String) -> Option<WsMessage> {
|
||||
async fn handle_text_message(&mut self, msg: String) -> Option<WsMessage> {
|
||||
debug!("Handling text message request");
|
||||
trace!("Content: {:?}", msg);
|
||||
|
||||
@@ -128,13 +142,13 @@ impl Handler {
|
||||
|
||||
let response = match client_request {
|
||||
Err(err) => Some(ServerResponse::Error(err)),
|
||||
Ok(req) => self.handle_request(req),
|
||||
Ok(req) => self.handle_request(req).await,
|
||||
};
|
||||
|
||||
response.map(|resp| WsMessage::text(resp.into_text()))
|
||||
}
|
||||
|
||||
fn handle_binary_message(&mut self, msg: Vec<u8>) -> Option<WsMessage> {
|
||||
async fn handle_binary_message(&mut self, msg: Vec<u8>) -> Option<WsMessage> {
|
||||
debug!("Handling binary message request");
|
||||
|
||||
self.received_response_type = ReceivedResponseType::Binary;
|
||||
@@ -142,19 +156,19 @@ impl Handler {
|
||||
|
||||
let response = match client_request {
|
||||
Err(err) => Some(ServerResponse::Error(err)),
|
||||
Ok(req) => self.handle_request(req),
|
||||
Ok(req) => self.handle_request(req).await,
|
||||
};
|
||||
|
||||
response.map(|resp| WsMessage::Binary(resp.into_binary()))
|
||||
}
|
||||
|
||||
fn handle_ws_request(&mut self, raw_request: WsMessage) -> Option<WsMessage> {
|
||||
async fn handle_ws_request(&mut self, raw_request: WsMessage) -> Option<WsMessage> {
|
||||
// apparently tungstenite auto-handles ping/pong/close messages so for now let's ignore
|
||||
// them and let's test that claim. If that's not the case, just copy code from
|
||||
// old version of this file.
|
||||
match raw_request {
|
||||
WsMessage::Text(text_message) => self.handle_text_message(text_message),
|
||||
WsMessage::Binary(binary_message) => self.handle_binary_message(binary_message),
|
||||
WsMessage::Text(text_message) => self.handle_text_message(text_message).await,
|
||||
WsMessage::Binary(binary_message) => self.handle_binary_message(binary_message).await,
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
@@ -244,7 +258,7 @@ impl Handler {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Some(response) = self.handle_ws_request(socket_msg) {
|
||||
if let Some(response) = self.handle_ws_request(socket_msg).await {
|
||||
if let Err(err) = self.send_websocket_response(response).await {
|
||||
warn!(
|
||||
"Failed to send message over websocket: {}. Assuming the connection is dead.",
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||
// Copyright 2021 - Nym Technologies SA <contat@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::client::config::Config;
|
||||
use crate::error::Socks5ClientError;
|
||||
@@ -15,6 +16,16 @@ use client_core::client::inbound_messages::{
|
||||
};
|
||||
use client_core::client::key_manager::KeyManager;
|
||||
use client_core::client::mix_traffic::{BatchMixMessageSender, MixTrafficController};
|
||||
use client_core::client::real_messages_control::acknowledgement_control::action_controller::{
|
||||
Action, ActionSender,
|
||||
};
|
||||
use client_core::client::real_messages_control::acknowledgement_control::input_message_listener::FreshInputMessageChunker;
|
||||
use client_core::client::real_messages_control::acknowledgement_control::{
|
||||
self, PendingAcknowledgement,
|
||||
};
|
||||
use client_core::client::real_messages_control::real_traffic_stream::{
|
||||
BatchRealMessageSender, RealMessage,
|
||||
};
|
||||
use client_core::client::real_messages_control::RealMessagesController;
|
||||
use client_core::client::received_buffer::{
|
||||
ReceivedBufferRequestReceiver, ReceivedBufferRequestSender, ReceivedMessagesBufferController,
|
||||
@@ -34,8 +45,13 @@ use gateway_client::{
|
||||
MixnetMessageSender,
|
||||
};
|
||||
use log::*;
|
||||
use nymsphinx::acknowledgements::AckKey;
|
||||
use nymsphinx::addressing::clients::Recipient;
|
||||
use nymsphinx::addressing::nodes::NodeIdentity;
|
||||
use nymsphinx::params::PacketSize;
|
||||
use nymsphinx::preparer::MessagePreparer;
|
||||
use rand::rngs::OsRng;
|
||||
use rand::{CryptoRng, Rng};
|
||||
use task::{wait_for_signal, ShutdownListener, ShutdownNotifier};
|
||||
|
||||
pub mod config;
|
||||
@@ -118,7 +134,7 @@ impl NymClient {
|
||||
input_receiver: InputMessageReceiver,
|
||||
mix_sender: BatchMixMessageSender,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
) -> FreshInputMessageChunker<OsRng> {
|
||||
let mut controller_config = client_core::client::real_messages_control::Config::new(
|
||||
self.key_manager.ack_key(),
|
||||
self.config.get_base().get_ack_wait_multiplier(),
|
||||
@@ -139,15 +155,20 @@ impl NymClient {
|
||||
|
||||
info!("Starting real traffic stream...");
|
||||
|
||||
RealMessagesController::new(
|
||||
let real_message_controller = RealMessagesController::new(
|
||||
controller_config,
|
||||
ack_receiver,
|
||||
input_receiver,
|
||||
mix_sender,
|
||||
topology_accessor,
|
||||
reply_key_storage,
|
||||
)
|
||||
.start_with_shutdown(shutdown);
|
||||
);
|
||||
|
||||
let fresh_input_msg_chunker = real_message_controller.get_fresh_input_message_chunker();
|
||||
|
||||
real_message_controller.start_with_shutdown(shutdown);
|
||||
|
||||
fresh_input_msg_chunker
|
||||
}
|
||||
|
||||
// buffer controlling all messages fetched from provider
|
||||
@@ -280,18 +301,23 @@ impl NymClient {
|
||||
buffer_requester: ReceivedBufferRequestSender,
|
||||
msg_input: InputMessageSender,
|
||||
shutdown: ShutdownListener,
|
||||
fresh_input_msg_chunker: FreshInputMessageChunker<OsRng>,
|
||||
) {
|
||||
info!("Starting socks5 listener...");
|
||||
let auth_methods = vec![AuthenticationMethods::NoAuth as u8];
|
||||
let allowed_users: Vec<User> = Vec::new();
|
||||
|
||||
let authenticator = Authenticator::new(auth_methods, allowed_users);
|
||||
|
||||
let mut sphinx_socks = SphinxSocksServer::new(
|
||||
self.config.get_listening_port(),
|
||||
authenticator,
|
||||
self.config.get_provider_mix_address(),
|
||||
self.as_mix_recipient(),
|
||||
shutdown,
|
||||
//self.key_manager.ack_key(),
|
||||
//self.as_mix_recipient(),
|
||||
fresh_input_msg_chunker,
|
||||
);
|
||||
tokio::spawn(async move { sphinx_socks.serve(msg_input, buffer_requester).await });
|
||||
}
|
||||
@@ -361,7 +387,7 @@ impl NymClient {
|
||||
let (received_buffer_request_sender, received_buffer_request_receiver) = mpsc::unbounded();
|
||||
|
||||
// channels responsible for controlling real messages
|
||||
let (input_sender, input_receiver) = mpsc::unbounded::<InputMessage>();
|
||||
let (input_sender, input_receiver) = tokio::sync::mpsc::channel::<InputMessage>(3);
|
||||
|
||||
// channels responsible for controlling ack messages
|
||||
let (ack_sender, ack_receiver) = mpsc::unbounded();
|
||||
@@ -396,7 +422,7 @@ impl NymClient {
|
||||
let sphinx_message_sender =
|
||||
Self::start_mix_traffic_controller(gateway_client, shutdown.subscribe());
|
||||
|
||||
self.start_real_traffic_controller(
|
||||
let fresh_input_msg_chunker = self.start_real_traffic_controller(
|
||||
shared_topology_accessor.clone(),
|
||||
reply_key_storage,
|
||||
ack_receiver,
|
||||
@@ -421,6 +447,7 @@ impl NymClient {
|
||||
received_buffer_request_sender,
|
||||
input_sender,
|
||||
shutdown.subscribe(),
|
||||
fresh_input_msg_chunker,
|
||||
);
|
||||
|
||||
info!("Client startup finished!");
|
||||
|
||||
@@ -6,24 +6,41 @@ use super::types::{ResponseCode, SocksProxyError};
|
||||
use super::{RESERVED, SOCKS_VERSION};
|
||||
use client_core::client::inbound_messages::InputMessage;
|
||||
use client_core::client::inbound_messages::InputMessageSender;
|
||||
use client_core::client::real_messages_control::acknowledgement_control::input_message_listener::FreshInputMessageChunker;
|
||||
use futures::channel::mpsc;
|
||||
use futures::task::{Context, Poll};
|
||||
use log::*;
|
||||
use nymsphinx::acknowledgements::AckKey;
|
||||
use nymsphinx::addressing::clients::Recipient;
|
||||
use nymsphinx::preparer::MessagePreparer;
|
||||
use pin_project::pin_project;
|
||||
use proxy_helpers::connection_controller::{
|
||||
ConnectionReceiver, ControllerCommand, ControllerSender,
|
||||
};
|
||||
use proxy_helpers::proxy_runner::ProxyRunner;
|
||||
use rand::RngCore;
|
||||
use rand::rngs::OsRng;
|
||||
use rand::{CryptoRng, Rng, RngCore};
|
||||
use socks5_requests::{ConnectionId, Message, RemoteAddress, Request};
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use task::ShutdownListener;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
|
||||
use tokio::{self, net::TcpStream};
|
||||
|
||||
use client_core::client::{
|
||||
inbound_messages::InputMessageReceiver,
|
||||
real_messages_control::{
|
||||
acknowledgement_control::{
|
||||
action_controller::{Action, ActionSender},
|
||||
PendingAcknowledgement,
|
||||
},
|
||||
real_traffic_stream::{BatchRealMessageSender, RealMessage},
|
||||
},
|
||||
topology_control::TopologyAccessor,
|
||||
};
|
||||
|
||||
#[pin_project(project = StateProject)]
|
||||
enum StreamState {
|
||||
Available(TcpStream),
|
||||
@@ -142,6 +159,9 @@ pub(crate) struct SocksClient {
|
||||
self_address: Recipient,
|
||||
started_proxy: bool,
|
||||
shutdown_listener: ShutdownListener,
|
||||
//ack_key: Arc<AckKey>,
|
||||
//ack_recipient: Recipient,
|
||||
fresh_input_msg_chunker: FreshInputMessageChunker<OsRng>,
|
||||
}
|
||||
|
||||
impl Drop for SocksClient {
|
||||
@@ -166,6 +186,9 @@ impl SocksClient {
|
||||
controller_sender: ControllerSender,
|
||||
self_address: Recipient,
|
||||
shutdown_listener: ShutdownListener,
|
||||
//ack_key: Arc<AckKey>,
|
||||
//ack_recipient: Recipient,
|
||||
fresh_input_msg_chunker: FreshInputMessageChunker<OsRng>,
|
||||
) -> Self {
|
||||
let connection_id = Self::generate_random();
|
||||
SocksClient {
|
||||
@@ -180,6 +203,9 @@ impl SocksClient {
|
||||
self_address,
|
||||
started_proxy: false,
|
||||
shutdown_listener,
|
||||
//ack_key,
|
||||
//ack_recipient,
|
||||
fresh_input_msg_chunker,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -231,7 +257,9 @@ impl SocksClient {
|
||||
let msg = Message::Request(req);
|
||||
|
||||
let input_message = InputMessage::new_fresh(self.service_provider, msg.into_bytes(), false);
|
||||
self.input_sender.unbounded_send(input_message).unwrap();
|
||||
if self.input_sender.send(input_message).await.is_err() {
|
||||
panic!();
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_proxy(&mut self, conn_receiver: ConnectionReceiver, remote_proxy_target: String) {
|
||||
@@ -246,7 +274,21 @@ impl SocksClient {
|
||||
let connection_id = self.connection_id;
|
||||
let input_sender = self.input_sender.clone();
|
||||
|
||||
// WIP(JON)
|
||||
//self.fresh_input_msg_chunker;
|
||||
|
||||
let recipient = self.service_provider;
|
||||
|
||||
//let mut fresh_input_msg_chunker = self.fresh_input_msg_chunker.clone();
|
||||
//let fn_fresh_input_msg_chunker = move |conn_id, read_data, socket_closed| {
|
||||
// let provider_request = Request::new_send(conn_id, read_data, socket_closed);
|
||||
// let provider_message = Message::Request(provider_request);
|
||||
// let msg = InputMessage::new_fresh(recipient, provider_message.into_bytes(), false);
|
||||
// fresh_input_msg_chunker.on_input_message(msg)
|
||||
//};
|
||||
//let msg_chunker = Box::new(MsgChunker::new(self.fresh_input_msg_chunker.clone()));
|
||||
let msg_chunker = self.fresh_input_msg_chunker.clone();
|
||||
|
||||
let (stream, _) = ProxyRunner::new(
|
||||
stream,
|
||||
local_stream_remote,
|
||||
@@ -255,6 +297,7 @@ impl SocksClient {
|
||||
input_sender,
|
||||
connection_id,
|
||||
self.shutdown_listener.clone(),
|
||||
Some(Box::new(msg_chunker)),
|
||||
)
|
||||
.run(move |conn_id, read_data, socket_closed| {
|
||||
let provider_request = Request::new_send(conn_id, read_data, socket_closed);
|
||||
@@ -430,3 +473,184 @@ impl SocksClient {
|
||||
Ok(methods)
|
||||
}
|
||||
}
|
||||
|
||||
//pub struct MsgChunker {
|
||||
// fresh_input_msg_chunker: FreshInputMessageChunker<OsRng>,
|
||||
//}
|
||||
//
|
||||
//impl MsgChunker {
|
||||
// fn new(fresh_input_msg_chunker: FreshInputMessageChunker<OsRng>) -> Self {
|
||||
// Self {
|
||||
// fresh_input_msg_chunker,
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// pub async fn on_input_message(&mut self, msg: InputMessage) {
|
||||
// self.fresh_input_msg_chunker.on_input_message(msg);
|
||||
// }
|
||||
//}
|
||||
|
||||
//let mut fresh_input_msg_chunker = self.fresh_input_msg_chunker.clone();
|
||||
//let fn_fresh_input_msg_chunker = move |conn_id, read_data, socket_closed| {
|
||||
// let provider_request = Request::new_send(conn_id, read_data, socket_closed);
|
||||
// let provider_message = Message::Request(provider_request);
|
||||
// let msg = InputMessage::new_fresh(recipient, provider_message.into_bytes(), false);
|
||||
// fresh_input_msg_chunker.on_input_message(msg)
|
||||
//};
|
||||
|
||||
//struct FreshInputMessageChunker<R>
|
||||
//where
|
||||
// R: CryptoRng + Rng,
|
||||
//{
|
||||
// ack_key: Arc<AckKey>,
|
||||
// ack_recipient: Recipient,
|
||||
// message_preparer: MessagePreparer<R>,
|
||||
// action_sender: ActionSender,
|
||||
// real_message_sender: BatchRealMessageSender,
|
||||
// topology_access: TopologyAccessor,
|
||||
//}
|
||||
//
|
||||
//impl<R> FreshInputMessageChunker<R>
|
||||
//where
|
||||
// R: CryptoRng + Rng,
|
||||
//{
|
||||
// fn new(
|
||||
// ack_key: Arc<AckKey>,
|
||||
// ack_recipient: Recipient,
|
||||
// message_preparer: MessagePreparer<R>,
|
||||
// action_sender: ActionSender,
|
||||
// real_message_sender: BatchRealMessageSender,
|
||||
// topology_access: TopologyAccessor,
|
||||
// ) -> Self {
|
||||
// Self {
|
||||
// ack_key,
|
||||
// ack_recipient,
|
||||
// message_preparer,
|
||||
// action_sender,
|
||||
// real_message_sender,
|
||||
// topology_access,
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // we require topology for replies to generate surb_acks
|
||||
// //async fn handle_reply(&mut self, reply_surb: ReplySurb, data: Vec<u8>) -> Option<RealMessage> {
|
||||
// // let topology_permit = self.topology_access.get_read_permit().await;
|
||||
// // let topology = match topology_permit.try_get_valid_topology_ref(&self.ack_recipient, None) {
|
||||
// // Some(topology_ref) => topology_ref,
|
||||
// // None => {
|
||||
// // warn!("Could not process the message - the network topology is invalid");
|
||||
// // return None;
|
||||
// // }
|
||||
// // };
|
||||
//
|
||||
// // match self
|
||||
// // .message_preparer
|
||||
// // .prepare_reply_for_use(data, reply_surb, topology, &self.ack_key)
|
||||
// // .await
|
||||
// // {
|
||||
// // Ok((mix_packet, reply_id)) => {
|
||||
// // // TODO: later probably write pending ack here
|
||||
// // // and deal with them....
|
||||
// // // ... somehow
|
||||
// // Some(RealMessage::new(mix_packet, reply_id))
|
||||
// // }
|
||||
// // Err(err) => {
|
||||
// // // TODO: should we have some mechanism to indicate to the user that the `reply_surb`
|
||||
// // // could be reused since technically it wasn't used up here?
|
||||
// // warn!("failed to deal with received reply surb - {:?}", err);
|
||||
// // None
|
||||
// // }
|
||||
// // }
|
||||
// //}
|
||||
//
|
||||
// async fn handle_fresh_message(
|
||||
// &mut self,
|
||||
// recipient: Recipient,
|
||||
// content: Vec<u8>,
|
||||
// with_reply_surb: bool,
|
||||
// ) -> Option<Vec<RealMessage>> {
|
||||
// let topology_permit = self.topology_access.get_read_permit().await;
|
||||
// let topology = match topology_permit
|
||||
// .try_get_valid_topology_ref(&self.ack_recipient, Some(&recipient))
|
||||
// {
|
||||
// Some(topology_ref) => topology_ref,
|
||||
// None => {
|
||||
// warn!("Could not process the message - the network topology is invalid");
|
||||
// return None;
|
||||
// }
|
||||
// };
|
||||
//
|
||||
// // split the message, attach optional reply surb
|
||||
// let (split_message, reply_key) = self
|
||||
// .message_preparer
|
||||
// .prepare_and_split_message(content, with_reply_surb, topology)
|
||||
// .expect("somehow the topology was invalid after all!");
|
||||
//
|
||||
// #[cfg(feature = "reply-surb")]
|
||||
// if let Some(reply_key) = reply_key {
|
||||
// self.reply_key_storage
|
||||
// .insert_encryption_key(reply_key)
|
||||
// .expect("Failed to insert surb reply key to the store!")
|
||||
// }
|
||||
//
|
||||
// #[cfg(not(feature = "reply-surb"))]
|
||||
// let _reply_key = reply_key;
|
||||
//
|
||||
// // encrypt chunks, put them inside sphinx packets and generate acks
|
||||
// let mut pending_acks = Vec::with_capacity(split_message.len());
|
||||
// let mut real_messages = Vec::with_capacity(split_message.len());
|
||||
// for message_chunk in split_message {
|
||||
// // we need to clone it because we need to keep it in memory in case we had to retransmit
|
||||
// // it. And then we'd need to recreate entire ACK again.
|
||||
// let chunk_clone = message_chunk.clone();
|
||||
// let prepared_fragment = self
|
||||
// .message_preparer
|
||||
// .prepare_chunk_for_sending(chunk_clone, topology, &self.ack_key, &recipient)
|
||||
// .unwrap();
|
||||
//
|
||||
// real_messages.push(RealMessage::new(
|
||||
// prepared_fragment.mix_packet,
|
||||
// message_chunk.fragment_identifier(),
|
||||
// ));
|
||||
//
|
||||
// pending_acks.push(PendingAcknowledgement::new(
|
||||
// message_chunk,
|
||||
// prepared_fragment.total_delay,
|
||||
// recipient,
|
||||
// ));
|
||||
// }
|
||||
//
|
||||
// // tells the controller to put this into the hashmap
|
||||
// self.action_sender
|
||||
// .unbounded_send(Action::new_insert(pending_acks))
|
||||
// .unwrap();
|
||||
//
|
||||
// Some(real_messages)
|
||||
// }
|
||||
//
|
||||
// pub async fn on_input_message(&mut self, msg: InputMessage) {
|
||||
// let real_messages = match msg {
|
||||
// InputMessage::Fresh {
|
||||
// recipient,
|
||||
// data,
|
||||
// with_reply_surb,
|
||||
// } => {
|
||||
// self.handle_fresh_message(recipient, data, with_reply_surb)
|
||||
// .await
|
||||
// }
|
||||
// InputMessage::Reply { .. } => panic!(),
|
||||
// //InputMessage::Reply { reply_surb, data } => self
|
||||
// // .handle_reply(reply_surb, data)
|
||||
// // .await
|
||||
// // .map(|message| vec![message]),
|
||||
// };
|
||||
//
|
||||
// // there's no point in trying to send nothing
|
||||
// if let Some(real_messages) = real_messages {
|
||||
// // tells real message sender (with the poisson timer) to send this to the mix network
|
||||
// self.real_message_sender
|
||||
// .unbounded_send(real_messages)
|
||||
// .unwrap();
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
|
||||
@@ -4,13 +4,17 @@ use super::{
|
||||
mixnet_responses::MixnetResponseListener,
|
||||
types::{ResponseCode, SocksProxyError},
|
||||
};
|
||||
use client_core::client::real_messages_control::acknowledgement_control::input_message_listener::FreshInputMessageChunker;
|
||||
use client_core::client::{
|
||||
inbound_messages::InputMessageSender, received_buffer::ReceivedBufferRequestSender,
|
||||
};
|
||||
use log::*;
|
||||
use nymsphinx::acknowledgements::AckKey;
|
||||
use nymsphinx::addressing::clients::Recipient;
|
||||
use proxy_helpers::connection_controller::Controller;
|
||||
use rand::rngs::OsRng;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use task::ShutdownListener;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
@@ -21,6 +25,9 @@ pub struct SphinxSocksServer {
|
||||
service_provider: Recipient,
|
||||
self_address: Recipient,
|
||||
shutdown: ShutdownListener,
|
||||
//ack_key: Arc<AckKey>,
|
||||
//ack_recipient: Recipient,
|
||||
fresh_input_msg_chunker: FreshInputMessageChunker<OsRng>,
|
||||
}
|
||||
|
||||
impl SphinxSocksServer {
|
||||
@@ -31,6 +38,9 @@ impl SphinxSocksServer {
|
||||
service_provider: Recipient,
|
||||
self_address: Recipient,
|
||||
shutdown: ShutdownListener,
|
||||
//ack_key: Arc<AckKey>,
|
||||
//ack_recipient: Recipient,
|
||||
fresh_input_msg_chunker: FreshInputMessageChunker<OsRng>,
|
||||
) -> Self {
|
||||
// hardcode ip as we (presumably) ONLY want to listen locally. If we change it, we can
|
||||
// just modify the config
|
||||
@@ -42,6 +52,9 @@ impl SphinxSocksServer {
|
||||
service_provider,
|
||||
self_address,
|
||||
shutdown,
|
||||
//ack_key,
|
||||
//ack_recipient,
|
||||
fresh_input_msg_chunker,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,6 +97,9 @@ impl SphinxSocksServer {
|
||||
controller_sender.clone(),
|
||||
self.self_address,
|
||||
self.shutdown.clone(),
|
||||
//self.ack_key.clone(),
|
||||
//self.ack_recipient.clone(),
|
||||
self.fresh_input_msg_chunker.clone(),
|
||||
);
|
||||
|
||||
tokio::spawn(async move {
|
||||
|
||||
@@ -18,5 +18,9 @@ socks5-requests = { path = "../requests" }
|
||||
ordered-buffer = { path = "../ordered-buffer" }
|
||||
task = { path = "../../task" }
|
||||
|
||||
client-core = { path = "../../../clients/client-core" }
|
||||
rand = { version = "0.7.3", features = ["wasm-bindgen"] }
|
||||
async-trait = "0.1.58"
|
||||
|
||||
[dev-dependencies]
|
||||
tokio-test = "0.4.2"
|
||||
|
||||
@@ -1,21 +1,25 @@
|
||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use super::Chunker;
|
||||
use super::MixProxySender;
|
||||
use super::SHUTDOWN_TIMEOUT;
|
||||
use crate::available_reader::AvailableReader;
|
||||
use bytes::Bytes;
|
||||
use client_core::client::real_messages_control::acknowledgement_control::input_message_listener::FreshInputMessageChunker;
|
||||
use futures::FutureExt;
|
||||
use futures::StreamExt;
|
||||
use log::*;
|
||||
use ordered_buffer::OrderedMessageSender;
|
||||
use rand::rngs::OsRng;
|
||||
use socks5_requests::ConnectionId;
|
||||
use std::time::Duration;
|
||||
use std::{io, sync::Arc};
|
||||
use task::ShutdownListener;
|
||||
use tokio::select;
|
||||
use tokio::{net::tcp::OwnedReadHalf, sync::Notify, time::sleep};
|
||||
|
||||
fn send_empty_close<F, S>(
|
||||
async fn send_empty_close<F, S>(
|
||||
connection_id: ConnectionId,
|
||||
message_sender: &mut OrderedMessageSender,
|
||||
mix_sender: &MixProxySender<S>,
|
||||
@@ -24,12 +28,16 @@ fn send_empty_close<F, S>(
|
||||
F: Fn(ConnectionId, Vec<u8>, bool) -> S,
|
||||
{
|
||||
let ordered_msg = message_sender.wrap_message(Vec::new()).into_bytes();
|
||||
mix_sender
|
||||
.unbounded_send(adapter_fn(connection_id, ordered_msg, true))
|
||||
.unwrap();
|
||||
if mix_sender
|
||||
.send(adapter_fn(connection_id, ordered_msg, true))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
panic!();
|
||||
};
|
||||
}
|
||||
|
||||
fn deal_with_data<F, S>(
|
||||
async fn deal_with_data<F, S>(
|
||||
read_data: Option<io::Result<Bytes>>,
|
||||
local_destination_address: &str,
|
||||
remote_source_address: &str,
|
||||
@@ -37,6 +45,7 @@ fn deal_with_data<F, S>(
|
||||
message_sender: &mut OrderedMessageSender,
|
||||
mix_sender: &MixProxySender<S>,
|
||||
adapter_fn: F,
|
||||
msg_chunker: &mut Option<Box<dyn Chunker<S>>>,
|
||||
) -> bool
|
||||
where
|
||||
F: Fn(ConnectionId, Vec<u8>, bool) -> S,
|
||||
@@ -63,9 +72,42 @@ where
|
||||
|
||||
// if we're sending through the mixnet increase the sequence number...
|
||||
let ordered_msg = message_sender.wrap_message(read_data.to_vec()).into_bytes();
|
||||
mix_sender
|
||||
.unbounded_send(adapter_fn(connection_id, ordered_msg, is_finished))
|
||||
.unwrap();
|
||||
|
||||
// WIP(JON): here we do the chunking, and send to real_message_sender instead
|
||||
if let Some(chunker) = msg_chunker {
|
||||
log::info!("({connection_id}): chunking and sending");
|
||||
let msg = adapter_fn(connection_id, ordered_msg, is_finished);
|
||||
chunker.chunk(msg).await;
|
||||
log::info!("({connection_id}): chunking and sending done");
|
||||
} else {
|
||||
log::info!("ordered_msg.len: {}", ordered_msg.len());
|
||||
if ordered_msg.len() > 10000 {
|
||||
log::info!("Sending large");
|
||||
log::info!("capacity: {}", mix_sender.capacity());
|
||||
loop {
|
||||
if mix_sender.capacity() > 2 {
|
||||
if mix_sender
|
||||
.send(adapter_fn(connection_id, ordered_msg, is_finished))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
panic!();
|
||||
}
|
||||
break;
|
||||
}
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
} else {
|
||||
log::info!("Sending smaller");
|
||||
if mix_sender
|
||||
.send(adapter_fn(connection_id, ordered_msg, is_finished))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
panic!();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if is_finished {
|
||||
// technically we already informed it when we sent the message to mixnet above
|
||||
@@ -85,6 +127,7 @@ pub(super) async fn run_inbound<F, S>(
|
||||
adapter_fn: F,
|
||||
shutdown_notify: Arc<Notify>,
|
||||
mut shutdown_listener: ShutdownListener,
|
||||
mut msg_chunker: Option<Box<dyn Chunker<S>>>,
|
||||
) -> OwnedReadHalf
|
||||
where
|
||||
F: Fn(ConnectionId, Vec<u8>, bool) -> S + Send + 'static,
|
||||
@@ -98,7 +141,7 @@ where
|
||||
loop {
|
||||
select! {
|
||||
read_data = &mut available_reader.next() => {
|
||||
if deal_with_data(read_data, &local_destination_address, &remote_source_address, connection_id, &mut message_sender, &mix_sender, &adapter_fn) {
|
||||
if deal_with_data(read_data, &local_destination_address, &remote_source_address, connection_id, &mut message_sender, &mix_sender, &adapter_fn, &mut msg_chunker).await {
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -106,7 +149,7 @@ where
|
||||
debug!("closing inbound proxy after outbound was closed {:?} ago", SHUTDOWN_TIMEOUT);
|
||||
// inform remote just in case it was closed because of lack of heartbeat.
|
||||
// worst case the remote will just have couple of false negatives
|
||||
send_empty_close(connection_id, &mut message_sender, &mix_sender, &adapter_fn);
|
||||
send_empty_close(connection_id, &mut message_sender, &mix_sender, &adapter_fn).await;
|
||||
break;
|
||||
}
|
||||
_ = shutdown_listener.recv() => {
|
||||
|
||||
@@ -2,17 +2,25 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::connection_controller::ConnectionReceiver;
|
||||
use async_trait::async_trait;
|
||||
use futures::channel::mpsc;
|
||||
use rand::rngs::OsRng;
|
||||
use socks5_requests::ConnectionId;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use task::ShutdownListener;
|
||||
use tokio::{net::TcpStream, sync::Notify};
|
||||
|
||||
use client_core::client::{
|
||||
inbound_messages::InputMessage,
|
||||
real_messages_control::acknowledgement_control::input_message_listener::FreshInputMessageChunker,
|
||||
};
|
||||
|
||||
mod inbound;
|
||||
mod outbound;
|
||||
|
||||
// TODO: make this configurable
|
||||
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
//const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60 * 10);
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ProxyMessage {
|
||||
@@ -29,11 +37,11 @@ impl From<(Vec<u8>, bool)> for ProxyMessage {
|
||||
}
|
||||
}
|
||||
|
||||
pub type MixProxySender<S> = mpsc::UnboundedSender<S>;
|
||||
pub type MixProxySender<S> = tokio::sync::mpsc::Sender<S>;
|
||||
|
||||
// TODO: when we finally get to implementing graceful shutdown,
|
||||
// on Drop this guy should tell the remote that it's closed now
|
||||
#[derive(Debug)]
|
||||
//#[derive(Debug)]
|
||||
pub struct ProxyRunner<S> {
|
||||
/// receives data from the mix network and sends that into the socket
|
||||
mix_receiver: Option<ConnectionReceiver>,
|
||||
@@ -48,6 +56,26 @@ pub struct ProxyRunner<S> {
|
||||
|
||||
// Listens to shutdown commands from higher up
|
||||
shutdown_listener: ShutdownListener,
|
||||
|
||||
msg_chunker: Option<Box<dyn Chunker<S>>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait Chunker<S>: Send {
|
||||
async fn chunk(&mut self, msg: S);
|
||||
|
||||
fn clone_box(&self) -> Box<dyn Chunker<S>>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Chunker<InputMessage> for FreshInputMessageChunker<OsRng> {
|
||||
async fn chunk(&mut self, msg: InputMessage) {
|
||||
self.on_input_message(msg).await;
|
||||
}
|
||||
|
||||
fn clone_box(&self) -> Box<dyn Chunker<InputMessage>> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> ProxyRunner<S>
|
||||
@@ -62,6 +90,7 @@ where
|
||||
mix_sender: MixProxySender<S>,
|
||||
connection_id: ConnectionId,
|
||||
shutdown_listener: ShutdownListener,
|
||||
msg_chunker: Option<Box<dyn Chunker<S>>>,
|
||||
) -> Self {
|
||||
ProxyRunner {
|
||||
mix_receiver: Some(mix_receiver),
|
||||
@@ -71,6 +100,7 @@ where
|
||||
remote_source_address,
|
||||
connection_id,
|
||||
shutdown_listener,
|
||||
msg_chunker,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,10 +108,12 @@ where
|
||||
// request/response as required by entity running particular side of the proxy.
|
||||
pub async fn run<F>(mut self, adapter_fn: F) -> Self
|
||||
where
|
||||
F: Fn(ConnectionId, Vec<u8>, bool) -> S + Send + 'static,
|
||||
F: Fn(ConnectionId, Vec<u8>, bool) -> S + Send + Sync + 'static,
|
||||
{
|
||||
let (read_half, write_half) = self.socket.take().unwrap().into_split();
|
||||
let shutdown_notify = Arc::new(Notify::new());
|
||||
//let chunker = Some(self.msg_chunker.as_ref().unwrap().clone_box());
|
||||
let chunker = self.msg_chunker.as_ref().map(|c| c.clone_box());
|
||||
|
||||
// should run until either inbound closes or is notified from outbound
|
||||
let inbound_future = inbound::run_inbound(
|
||||
@@ -93,6 +125,7 @@ where
|
||||
adapter_fn,
|
||||
Arc::clone(&shutdown_notify),
|
||||
self.shutdown_listener.clone(),
|
||||
chunker,
|
||||
);
|
||||
|
||||
let outbound_future = outbound::run_outbound(
|
||||
|
||||
@@ -40,7 +40,7 @@ impl Connection {
|
||||
pub(crate) async fn run_proxy(
|
||||
&mut self,
|
||||
mix_receiver: ConnectionReceiver,
|
||||
mix_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
||||
mix_sender: tokio::sync::mpsc::Sender<(Socks5Message, Recipient)>,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
let stream = self.conn.take().unwrap();
|
||||
@@ -55,6 +55,7 @@ impl Connection {
|
||||
mix_sender,
|
||||
connection_id,
|
||||
shutdown,
|
||||
None,
|
||||
)
|
||||
.run(move |conn_id, read_data, socket_closed| {
|
||||
(
|
||||
|
||||
@@ -66,11 +66,11 @@ impl ServiceProvider {
|
||||
/// via the `websocket_writer`.
|
||||
async fn mixnet_response_listener(
|
||||
mut websocket_writer: SplitSink<TSWebsocketStream, Message>,
|
||||
mut mix_reader: mpsc::UnboundedReceiver<(Socks5Message, Recipient)>,
|
||||
mut mix_reader: tokio::sync::mpsc::Receiver<(Socks5Message, Recipient)>,
|
||||
stats_collector: Option<ServiceStatisticsCollector>,
|
||||
) {
|
||||
// TODO: wire SURBs in here once they're available
|
||||
while let Some((msg, return_address)) = mix_reader.next().await {
|
||||
while let Some((msg, return_address)) = mix_reader.recv().await {
|
||||
if let Some(stats_collector) = stats_collector.as_ref() {
|
||||
if let Some(remote_addr) = stats_collector
|
||||
.connected_services
|
||||
@@ -134,7 +134,7 @@ impl ServiceProvider {
|
||||
remote_addr: String,
|
||||
return_address: Recipient,
|
||||
controller_sender: ControllerSender,
|
||||
mix_input_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
||||
mix_input_sender: tokio::sync::mpsc::Sender<(Socks5Message, Recipient)>,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
let mut conn = match Connection::new(conn_id, remote_addr.clone(), return_address).await {
|
||||
@@ -147,12 +147,16 @@ impl ServiceProvider {
|
||||
);
|
||||
|
||||
// inform the remote that the connection is closed before it even was established
|
||||
mix_input_sender
|
||||
.unbounded_send((
|
||||
if mix_input_sender
|
||||
.send((
|
||||
Socks5Message::Response(Response::new(conn_id, Vec::new(), true)),
|
||||
return_address,
|
||||
))
|
||||
.unwrap();
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
panic!();
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
@@ -188,10 +192,10 @@ impl ServiceProvider {
|
||||
);
|
||||
}
|
||||
|
||||
fn handle_proxy_connect(
|
||||
async fn handle_proxy_connect(
|
||||
&mut self,
|
||||
controller_sender: &mut ControllerSender,
|
||||
mix_input_sender: &mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
||||
mix_input_sender: &tokio::sync::mpsc::Sender<(Socks5Message, Recipient)>,
|
||||
conn_id: ConnectionId,
|
||||
remote_addr: String,
|
||||
return_address: Recipient,
|
||||
@@ -200,14 +204,18 @@ impl ServiceProvider {
|
||||
if !self.open_proxy && !self.outbound_request_filter.check(&remote_addr) {
|
||||
let log_msg = format!("Domain {:?} failed filter check", remote_addr);
|
||||
log::info!("{}", log_msg);
|
||||
mix_input_sender
|
||||
.unbounded_send((
|
||||
if mix_input_sender
|
||||
.send((
|
||||
Socks5Message::NetworkRequesterResponse(NetworkRequesterResponse::new(
|
||||
conn_id, log_msg,
|
||||
)),
|
||||
return_address,
|
||||
))
|
||||
.unwrap();
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
panic!();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -244,7 +252,7 @@ impl ServiceProvider {
|
||||
&mut self,
|
||||
raw_request: &[u8],
|
||||
controller_sender: &mut ControllerSender,
|
||||
mix_input_sender: &mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
||||
mix_input_sender: &tokio::sync::mpsc::Sender<(Socks5Message, Recipient)>,
|
||||
stats_collector: Option<ServiceStatisticsCollector>,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
@@ -273,6 +281,7 @@ impl ServiceProvider {
|
||||
req.return_address,
|
||||
shutdown,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
Request::Send(conn_id, data, closed) => {
|
||||
@@ -307,7 +316,7 @@ impl ServiceProvider {
|
||||
// channels responsible for managing messages that are to be sent to the mix network. The receiver is
|
||||
// going to be used by `mixnet_response_listener`
|
||||
let (mix_input_sender, mix_input_receiver) =
|
||||
mpsc::unbounded::<(Socks5Message, Recipient)>();
|
||||
tokio::sync::mpsc::channel::<(Socks5Message, Recipient)>(3);
|
||||
|
||||
// Controller for managing all active connections.
|
||||
// We provide it with a ShutdownListener since it requires it, even though for the network
|
||||
|
||||
@@ -77,13 +77,13 @@ pub struct ServiceStatisticsCollector {
|
||||
pub(crate) response_stats_data: Arc<RwLock<StatsData>>,
|
||||
pub(crate) connected_services: Arc<RwLock<HashMap<ConnectionId, RemoteAddress>>>,
|
||||
stats_provider_addr: Recipient,
|
||||
mix_input_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
||||
mix_input_sender: tokio::sync::mpsc::Sender<(Socks5Message, Recipient)>,
|
||||
}
|
||||
|
||||
impl ServiceStatisticsCollector {
|
||||
pub async fn new(
|
||||
stats_provider_addr: Option<Recipient>,
|
||||
mix_input_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
||||
mix_input_sender: tokio::sync::mpsc::Sender<(Socks5Message, Recipient)>,
|
||||
) -> Result<Self, StatsError> {
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(Duration::from_secs(3))
|
||||
@@ -175,20 +175,27 @@ impl StatisticsCollector for ServiceStatisticsCollector {
|
||||
),
|
||||
self.stats_provider_addr,
|
||||
);
|
||||
self.mix_input_sender
|
||||
.unbounded_send((
|
||||
if self
|
||||
.mix_input_sender
|
||||
.send((
|
||||
Socks5Message::Request(connect_req),
|
||||
self.stats_provider_addr,
|
||||
))
|
||||
.unwrap();
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
panic!();
|
||||
}
|
||||
|
||||
trace!("Sending data to statistics service");
|
||||
let mut message_sender = OrderedMessageSender::new();
|
||||
let ordered_msg = message_sender.wrap_message(msg).into_bytes();
|
||||
let send_req = Request::new_send(conn_id, ordered_msg, true);
|
||||
self.mix_input_sender
|
||||
.unbounded_send((Socks5Message::Request(send_req), self.stats_provider_addr))
|
||||
.unwrap();
|
||||
if self.mix_input_sender
|
||||
.send((Socks5Message::Request(send_req), self.stats_provider_addr))
|
||||
.await.is_err() {
|
||||
panic!();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user