Compare commits
18 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ea401b72e8 | |||
| 21471a78c3 | |||
| 9a7a7f5615 | |||
| 8e69a43f27 | |||
| 3b4664fee9 | |||
| 5feb354568 | |||
| 777053bda1 | |||
| ed47942a53 | |||
| ec5f789c16 | |||
| 37381f9f1a | |||
| 3c9774314f | |||
| 100634099e | |||
| 51a2c9c180 | |||
| e0f5455739 | |||
| 48de8c5cf7 | |||
| c23ec0e8f8 | |||
| 1f4f153963 | |||
| 67af175e18 |
@@ -27,6 +27,7 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
|
|||||||
- socks5 client: graceful shutdown should fix error on disconnect in nym-connect ([#1591])
|
- socks5 client: graceful shutdown should fix error on disconnect in nym-connect ([#1591])
|
||||||
- wasm-client: fixed build errors on MacOS and changed example JS code to use mainnet ([#1585])
|
- wasm-client: fixed build errors on MacOS and changed example JS code to use mainnet ([#1585])
|
||||||
- gateway-client: will attempt to read now as many as 8 websocket messages at once, assuming they're already available on the socket ([#1669])
|
- gateway-client: will attempt to read now as many as 8 websocket messages at once, assuming they're already available on the socket ([#1669])
|
||||||
|
- clients: bound the sphinx packet channel and reduce sending rate if gateway can't keep up ([#1703])
|
||||||
|
|
||||||
[#1541]: https://github.com/nymtech/nym/pull/1541
|
[#1541]: https://github.com/nymtech/nym/pull/1541
|
||||||
[#1558]: https://github.com/nymtech/nym/pull/1558
|
[#1558]: https://github.com/nymtech/nym/pull/1558
|
||||||
@@ -41,6 +42,7 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
|
|||||||
[#1671]: https://github.com/nymtech/nym/pull/1671
|
[#1671]: https://github.com/nymtech/nym/pull/1671
|
||||||
[#1673]: https://github.com/nymtech/nym/pull/1673
|
[#1673]: https://github.com/nymtech/nym/pull/1673
|
||||||
[#1702]: https://github.com/nymtech/nym/pull/1702
|
[#1702]: https://github.com/nymtech/nym/pull/1702
|
||||||
|
[#1703]: https://github.com/nymtech/nym/pull/1703
|
||||||
|
|
||||||
|
|
||||||
## [nym-binaries-1.0.2](https://github.com/nymtech/nym/tree/nym-binaries-1.0.2)
|
## [nym-binaries-1.0.2](https://github.com/nymtech/nym/tree/nym-binaries-1.0.2)
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ use rand::{rngs::OsRng, CryptoRng, Rng};
|
|||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
use tokio::sync::mpsc::error::TrySendError;
|
||||||
|
|
||||||
#[cfg(not(target_arch = "wasm32"))]
|
#[cfg(not(target_arch = "wasm32"))]
|
||||||
use tokio::time;
|
use tokio::time;
|
||||||
@@ -171,11 +172,18 @@ impl LoopCoverTrafficStream<OsRng> {
|
|||||||
)
|
)
|
||||||
.expect("Somehow failed to generate a loop cover message with a valid topology");
|
.expect("Somehow failed to generate a loop cover message with a valid topology");
|
||||||
|
|
||||||
// if this one fails, there's no retrying because it means that either:
|
if let Err(err) = self.mix_tx.try_send(vec![cover_message]) {
|
||||||
// - we run out of memory
|
match err {
|
||||||
// - the receiver channel is closed
|
TrySendError::Full(_) => {
|
||||||
// in either case there's no recovery and we can only panic
|
// This isn't a problem, if the channel is full means we're already sending the
|
||||||
self.mix_tx.unbounded_send(vec![cover_message]).unwrap();
|
// max amount of messages downstream can handle.
|
||||||
|
log::debug!("Failed to send cover message - channel full");
|
||||||
|
}
|
||||||
|
TrySendError::Closed(_) => {
|
||||||
|
log::warn!("Failed to send cover message - channel closed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: I'm not entirely sure whether this is really required, because I'm not 100%
|
// TODO: I'm not entirely sure whether this is really required, because I'm not 100%
|
||||||
// sure how `yield_now()` works - whether it just notifies the scheduler or whether it
|
// sure how `yield_now()` works - whether it just notifies the scheduler or whether it
|
||||||
|
|||||||
@@ -2,8 +2,10 @@ use futures::channel::mpsc;
|
|||||||
use nymsphinx::addressing::clients::Recipient;
|
use nymsphinx::addressing::clients::Recipient;
|
||||||
use nymsphinx::anonymous_replies::ReplySurb;
|
use nymsphinx::anonymous_replies::ReplySurb;
|
||||||
|
|
||||||
pub type InputMessageSender = mpsc::UnboundedSender<InputMessage>;
|
//pub type InputMessageSender = mpsc::Sender<InputMessage>;
|
||||||
pub type InputMessageReceiver = mpsc::UnboundedReceiver<InputMessage>;
|
//pub type InputMessageReceiver = mpsc::UnboundedReceiver<InputMessage>;
|
||||||
|
pub type InputMessageSender = tokio::sync::mpsc::Sender<InputMessage>;
|
||||||
|
pub type InputMessageReceiver = tokio::sync::mpsc::Receiver<InputMessage>;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum InputMessage {
|
pub enum InputMessage {
|
||||||
|
|||||||
@@ -2,15 +2,15 @@
|
|||||||
// SPDX-License-Identifier: Apache-2.0
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
use crate::spawn_future;
|
use crate::spawn_future;
|
||||||
use futures::channel::mpsc;
|
|
||||||
use futures::StreamExt;
|
|
||||||
use gateway_client::GatewayClient;
|
use gateway_client::GatewayClient;
|
||||||
use log::*;
|
use log::*;
|
||||||
use nymsphinx::forwarding::packet::MixPacket;
|
use nymsphinx::forwarding::packet::MixPacket;
|
||||||
|
|
||||||
pub type BatchMixMessageSender = mpsc::UnboundedSender<Vec<MixPacket>>;
|
pub type BatchMixMessageSender = tokio::sync::mpsc::Sender<Vec<MixPacket>>;
|
||||||
pub type BatchMixMessageReceiver = mpsc::UnboundedReceiver<Vec<MixPacket>>;
|
pub type BatchMixMessageReceiver = tokio::sync::mpsc::Receiver<Vec<MixPacket>>;
|
||||||
|
|
||||||
|
// We remind ourselves that 32 x 32kb = 1024kb, a reasonable size for a network buffer.
|
||||||
|
pub const MIX_MESSAGE_RECEIVER_BUFFER_SIZE: usize = 32;
|
||||||
const MAX_FAILURE_COUNT: usize = 100;
|
const MAX_FAILURE_COUNT: usize = 100;
|
||||||
|
|
||||||
pub struct MixTrafficController {
|
pub struct MixTrafficController {
|
||||||
@@ -25,15 +25,17 @@ pub struct MixTrafficController {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl MixTrafficController {
|
impl MixTrafficController {
|
||||||
pub fn new(
|
pub fn new(gateway_client: GatewayClient) -> (MixTrafficController, BatchMixMessageSender) {
|
||||||
mix_rx: BatchMixMessageReceiver,
|
let (sphinx_message_sender, sphinx_message_receiver) =
|
||||||
gateway_client: GatewayClient,
|
tokio::sync::mpsc::channel(MIX_MESSAGE_RECEIVER_BUFFER_SIZE);
|
||||||
) -> MixTrafficController {
|
(
|
||||||
MixTrafficController {
|
MixTrafficController {
|
||||||
gateway_client,
|
gateway_client,
|
||||||
mix_rx,
|
mix_rx: sphinx_message_receiver,
|
||||||
consecutive_gateway_failure_count: 0,
|
consecutive_gateway_failure_count: 0,
|
||||||
}
|
},
|
||||||
|
sphinx_message_sender,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn on_messages(&mut self, mut mix_packets: Vec<MixPacket>) {
|
async fn on_messages(&mut self, mut mix_packets: Vec<MixPacket>) {
|
||||||
@@ -72,7 +74,7 @@ impl MixTrafficController {
|
|||||||
|
|
||||||
while !shutdown.is_shutdown() {
|
while !shutdown.is_shutdown() {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
mix_packets = self.mix_rx.next() => match mix_packets {
|
mix_packets = self.mix_rx.recv() => match mix_packets {
|
||||||
Some(mix_packets) => {
|
Some(mix_packets) => {
|
||||||
self.on_messages(mix_packets).await;
|
self.on_messages(mix_packets).await;
|
||||||
},
|
},
|
||||||
@@ -96,7 +98,7 @@ impl MixTrafficController {
|
|||||||
spawn_future(async move {
|
spawn_future(async move {
|
||||||
debug!("Started MixTrafficController without graceful shutdown support");
|
debug!("Started MixTrafficController without graceful shutdown support");
|
||||||
|
|
||||||
while let Some(mix_packets) = self.mix_rx.next().await {
|
while let Some(mix_packets) = self.mix_rx.recv().await {
|
||||||
self.on_messages(mix_packets).await;
|
self.on_messages(mix_packets).await;
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|||||||
+1
-1
@@ -33,7 +33,7 @@ impl AcknowledgementListener {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn on_ack(&mut self, ack_content: Vec<u8>) {
|
async fn on_ack(&mut self, ack_content: Vec<u8>) {
|
||||||
debug!("Received an ack");
|
trace!("Received an ack");
|
||||||
let frag_id = match recover_identifier(&self.ack_key, &ack_content)
|
let frag_id = match recover_identifier(&self.ack_key, &ack_content)
|
||||||
.map(FragmentIdentifier::try_from_bytes)
|
.map(FragmentIdentifier::try_from_bytes)
|
||||||
{
|
{
|
||||||
|
|||||||
+9
-4
@@ -15,6 +15,7 @@ use nymsphinx::preparer::MessagePreparer;
|
|||||||
use nymsphinx::{acknowledgements::AckKey, addressing::clients::Recipient};
|
use nymsphinx::{acknowledgements::AckKey, addressing::clients::Recipient};
|
||||||
use rand::{CryptoRng, Rng};
|
use rand::{CryptoRng, Rng};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::thread::panicking;
|
||||||
|
|
||||||
#[cfg(feature = "reply-surb")]
|
#[cfg(feature = "reply-surb")]
|
||||||
use crate::client::reply_key_storage::ReplyKeyStorage;
|
use crate::client::reply_key_storage::ReplyKeyStorage;
|
||||||
@@ -182,9 +183,13 @@ where
|
|||||||
// there's no point in trying to send nothing
|
// there's no point in trying to send nothing
|
||||||
if let Some(real_messages) = real_messages {
|
if let Some(real_messages) = real_messages {
|
||||||
// tells real message sender (with the poisson timer) to send this to the mix network
|
// tells real message sender (with the poisson timer) to send this to the mix network
|
||||||
self.real_message_sender
|
log::info!("real_message_sender capacity: {}", self.real_message_sender.capacity());
|
||||||
.unbounded_send(real_messages)
|
if let Err(err) = self.real_message_sender
|
||||||
.unwrap();
|
.send(real_messages)
|
||||||
|
.await {
|
||||||
|
log::error!("Failed to send");
|
||||||
|
panic!();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -194,7 +199,7 @@ where
|
|||||||
|
|
||||||
while !shutdown.is_shutdown() {
|
while !shutdown.is_shutdown() {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
input_msg = self.input_receiver.next() => match input_msg {
|
input_msg = self.input_receiver.recv() => match input_msg {
|
||||||
Some(input_msg) => {
|
Some(input_msg) => {
|
||||||
self.on_input_message(input_msg).await;
|
self.on_input_message(input_msg).await;
|
||||||
},
|
},
|
||||||
|
|||||||
+8
-3
@@ -112,12 +112,17 @@ where
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// send to `OutQueueControl` to eventually send to the mix network
|
// send to `OutQueueControl` to eventually send to the mix network
|
||||||
self.real_message_sender
|
if let Err(err) = self
|
||||||
.unbounded_send(vec![RealMessage::new(
|
.real_message_sender
|
||||||
|
.send(vec![RealMessage::new(
|
||||||
prepared_fragment.mix_packet,
|
prepared_fragment.mix_packet,
|
||||||
frag_id,
|
frag_id,
|
||||||
)])
|
)])
|
||||||
.unwrap();
|
.await
|
||||||
|
{
|
||||||
|
log::error!("Failed to send");
|
||||||
|
panic!();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(target_arch = "wasm32"))]
|
#[cfg(not(target_arch = "wasm32"))]
|
||||||
|
|||||||
@@ -113,7 +113,8 @@ impl RealMessagesController<OsRng> {
|
|||||||
) -> Self {
|
) -> Self {
|
||||||
let rng = OsRng;
|
let rng = OsRng;
|
||||||
|
|
||||||
let (real_message_sender, real_message_receiver) = mpsc::unbounded();
|
//let (real_message_sender, real_message_receiver) = mpsc::unbounded();
|
||||||
|
let (real_message_sender, real_message_receiver) = tokio::sync::mpsc::channel(16);
|
||||||
let (sent_notifier_tx, sent_notifier_rx) = mpsc::unbounded();
|
let (sent_notifier_tx, sent_notifier_rx) = mpsc::unbounded();
|
||||||
|
|
||||||
let ack_controller_connectors = AcknowledgementControllerConnectors::new(
|
let ack_controller_connectors = AcknowledgementControllerConnectors::new(
|
||||||
|
|||||||
@@ -27,6 +27,23 @@ use tokio::time;
|
|||||||
#[cfg(target_arch = "wasm32")]
|
#[cfg(target_arch = "wasm32")]
|
||||||
use wasm_timer;
|
use wasm_timer;
|
||||||
|
|
||||||
|
// The minimum time between increasing the average delay between packets. If we hit the ceiling in
|
||||||
|
// the available buffer space we want to take somewhat swift action, but we still need to give a
|
||||||
|
// short time to give the channel a chance reduce pressure.
|
||||||
|
const INCREASE_DELAY_MIN_CHANGE_INTERVAL_SECS: u64 = 1;
|
||||||
|
// The minimum time between decreasing the average delay between packets. We don't want to change
|
||||||
|
// to quickly to keep things somewhat stable. Also there are buffers downstreams meaning we need to
|
||||||
|
// wait a little to see the effect before we decrease further.
|
||||||
|
const DECREASE_DELAY_MIN_CHANGE_INTERVAL_SECS: u64 = 30;
|
||||||
|
// If we enough time passes without any sign of backpressure in the channel, we can consider
|
||||||
|
// lowering the average delay. The goal is to keep somewhat stable, rather than maxing out
|
||||||
|
// bandwidth at all times.
|
||||||
|
const ACCEPTABLE_TIME_WITHOUT_BACKPRESSURE_SECS: u64 = 30;
|
||||||
|
// The maximum multiplier we apply to the base average Poisson delay.
|
||||||
|
const MAX_DELAY_MULTIPLIER: u32 = 6;
|
||||||
|
// The minium multiplier we apply to the base average Poisson delay.
|
||||||
|
const MIN_DELAY_MULTIPLIER: u32 = 1;
|
||||||
|
|
||||||
/// Configurable parameters of the `OutQueueControl`
|
/// Configurable parameters of the `OutQueueControl`
|
||||||
pub(crate) struct Config {
|
pub(crate) struct Config {
|
||||||
/// Average delay an acknowledgement packet is going to get delay at a single mixnode.
|
/// Average delay an acknowledgement packet is going to get delay at a single mixnode.
|
||||||
@@ -68,6 +85,101 @@ impl Config {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct SendingDelayController {
|
||||||
|
/// Multiply the average sending delay.
|
||||||
|
/// This is normally set to unity, but if we detect backpressure we increase this
|
||||||
|
/// multiplier. We use discrete steps.
|
||||||
|
current_multiplier: u32,
|
||||||
|
|
||||||
|
/// Maximum delay multiplier
|
||||||
|
upper_bound: u32,
|
||||||
|
|
||||||
|
/// Minimum delay multiplier
|
||||||
|
lower_bound: u32,
|
||||||
|
|
||||||
|
/// To make sure we don't change the multiplier to fast, we limit a change to some duration
|
||||||
|
#[cfg(not(target_arch = "wasm32"))]
|
||||||
|
time_when_changed: time::Instant,
|
||||||
|
|
||||||
|
#[cfg(target_arch = "wasm32")]
|
||||||
|
time_when_changed: wasm_timer::Instant,
|
||||||
|
|
||||||
|
/// If we have a long enough time without any backpressure detected we try reducing the sending
|
||||||
|
/// delay multiplier
|
||||||
|
#[cfg(not(target_arch = "wasm32"))]
|
||||||
|
time_when_backpressure_detected: time::Instant,
|
||||||
|
|
||||||
|
#[cfg(target_arch = "wasm32")]
|
||||||
|
time_when_backpressure_detected: wasm_timer::Instant,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(not(target_arch = "wasm32"))]
|
||||||
|
fn get_time_now() -> time::Instant {
|
||||||
|
time::Instant::now()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(target_arch = "wasm32")]
|
||||||
|
fn get_time_now() -> wasm_timer::Instant {
|
||||||
|
wasm_timer::Instant::now()
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SendingDelayController {
|
||||||
|
fn new(lower_bound: u32, upper_bound: u32) -> Self {
|
||||||
|
assert!(lower_bound <= upper_bound);
|
||||||
|
let now = get_time_now();
|
||||||
|
SendingDelayController {
|
||||||
|
current_multiplier: MIN_DELAY_MULTIPLIER,
|
||||||
|
upper_bound,
|
||||||
|
lower_bound,
|
||||||
|
time_when_changed: now,
|
||||||
|
time_when_backpressure_detected: now,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn current_multiplier(&self) -> u32 {
|
||||||
|
self.current_multiplier
|
||||||
|
}
|
||||||
|
|
||||||
|
fn increase_delay_multiplier(&mut self) {
|
||||||
|
self.current_multiplier =
|
||||||
|
(self.current_multiplier + 1).clamp(self.lower_bound, self.upper_bound);
|
||||||
|
self.time_when_changed = get_time_now();
|
||||||
|
log::debug!(
|
||||||
|
"Increasing sending delay multiplier to: {}",
|
||||||
|
self.current_multiplier
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn decrease_delay_multiplier(&mut self) {
|
||||||
|
self.current_multiplier =
|
||||||
|
(self.current_multiplier - 1).clamp(self.lower_bound, self.upper_bound);
|
||||||
|
self.time_when_changed = get_time_now();
|
||||||
|
log::debug!(
|
||||||
|
"Decreasing sending delay multiplier to: {}",
|
||||||
|
self.current_multiplier
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn record_backpressure_detected(&mut self) {
|
||||||
|
self.time_when_backpressure_detected = get_time_now();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn not_increased_delay_recently(&self) -> bool {
|
||||||
|
get_time_now()
|
||||||
|
> self.time_when_changed + Duration::from_secs(INCREASE_DELAY_MIN_CHANGE_INTERVAL_SECS)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_sending_reliable(&self) -> bool {
|
||||||
|
let now = get_time_now();
|
||||||
|
let delay_change_interval = Duration::from_secs(DECREASE_DELAY_MIN_CHANGE_INTERVAL_SECS);
|
||||||
|
let acceptable_time_without_backpressure =
|
||||||
|
Duration::from_secs(ACCEPTABLE_TIME_WITHOUT_BACKPRESSURE_SECS);
|
||||||
|
|
||||||
|
now > self.time_when_backpressure_detected + acceptable_time_without_backpressure
|
||||||
|
&& now > self.time_when_changed + delay_change_interval
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) struct OutQueueControl<R>
|
pub(crate) struct OutQueueControl<R>
|
||||||
where
|
where
|
||||||
R: CryptoRng + Rng,
|
R: CryptoRng + Rng,
|
||||||
@@ -89,6 +201,10 @@ where
|
|||||||
#[cfg(target_arch = "wasm32")]
|
#[cfg(target_arch = "wasm32")]
|
||||||
next_delay: Option<Pin<Box<wasm_timer::Delay>>>,
|
next_delay: Option<Pin<Box<wasm_timer::Delay>>>,
|
||||||
|
|
||||||
|
// To make sure we don't overload the mix_tx channel, we limit the rate we are pushing
|
||||||
|
// messages.
|
||||||
|
sending_rate_controller: SendingDelayController,
|
||||||
|
|
||||||
/// Channel used for sending prepared sphinx packets to `MixTrafficController` that sends them
|
/// Channel used for sending prepared sphinx packets to `MixTrafficController` that sends them
|
||||||
/// out to the network without any further delays.
|
/// out to the network without any further delays.
|
||||||
mix_tx: BatchMixMessageSender,
|
mix_tx: BatchMixMessageSender,
|
||||||
@@ -126,8 +242,10 @@ impl RealMessage {
|
|||||||
|
|
||||||
// messages are already prepared, etc. the real point of it is to forward it to mix_traffic
|
// messages are already prepared, etc. the real point of it is to forward it to mix_traffic
|
||||||
// after sufficient delay
|
// after sufficient delay
|
||||||
pub(crate) type BatchRealMessageSender = mpsc::UnboundedSender<Vec<RealMessage>>;
|
//pub(crate) type BatchRealMessageSender = mpsc::UnboundedSender<Vec<RealMessage>>;
|
||||||
type BatchRealMessageReceiver = mpsc::UnboundedReceiver<Vec<RealMessage>>;
|
//type BatchRealMessageReceiver = mpsc::UnboundedReceiver<Vec<RealMessage>>;
|
||||||
|
pub(crate) type BatchRealMessageSender = tokio::sync::mpsc::Sender<Vec<RealMessage>>;
|
||||||
|
type BatchRealMessageReceiver = tokio::sync::mpsc::Receiver<Vec<RealMessage>>;
|
||||||
|
|
||||||
pub(crate) enum StreamMessage {
|
pub(crate) enum StreamMessage {
|
||||||
Cover,
|
Cover,
|
||||||
@@ -156,6 +274,10 @@ where
|
|||||||
ack_key,
|
ack_key,
|
||||||
sent_notifier,
|
sent_notifier,
|
||||||
next_delay: None,
|
next_delay: None,
|
||||||
|
sending_rate_controller: SendingDelayController::new(
|
||||||
|
MIN_DELAY_MULTIPLIER,
|
||||||
|
MAX_DELAY_MULTIPLIER,
|
||||||
|
),
|
||||||
mix_tx,
|
mix_tx,
|
||||||
real_receiver,
|
real_receiver,
|
||||||
our_full_destination,
|
our_full_destination,
|
||||||
@@ -212,15 +334,8 @@ where
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// if this one fails, there's no retrying because it means that either:
|
if let Err(err) = self.mix_tx.send(vec![next_message]).await {
|
||||||
// - we run out of memory
|
log::error!("Failed to send - channel closed: {}", err);
|
||||||
// - the receiver channel is closed
|
|
||||||
// in either case there's no recovery and we can only panic
|
|
||||||
if let Err(err) = self.mix_tx.unbounded_send(vec![next_message]) {
|
|
||||||
log::warn!(
|
|
||||||
"Failed to send {} packets (possible process shutdown?)",
|
|
||||||
err.into_inner().len()
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// JS: Not entirely sure why or how it fixes stuff, but without the yield call,
|
// JS: Not entirely sure why or how it fixes stuff, but without the yield call,
|
||||||
@@ -234,7 +349,44 @@ where
|
|||||||
tokio::task::yield_now().await;
|
tokio::task::yield_now().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn current_average_message_sending_delay(&self) -> Duration {
|
||||||
|
self.config.average_message_sending_delay
|
||||||
|
* self.sending_rate_controller.current_multiplier()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn adjust_current_average_message_sending_delay(&mut self) {
|
||||||
|
let used_slots = self.mix_tx.max_capacity() - self.mix_tx.capacity();
|
||||||
|
log::trace!(
|
||||||
|
"used_slots: {used_slots}, current_multiplier: {}",
|
||||||
|
self.sending_rate_controller.current_multiplier()
|
||||||
|
);
|
||||||
|
|
||||||
|
// Even just a single used slot is enough to signal backpressure
|
||||||
|
if used_slots > 0 {
|
||||||
|
log::trace!("Backpressure detected");
|
||||||
|
self.sending_rate_controller.record_backpressure_detected();
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the buffer is running out, slow down the sending rate
|
||||||
|
if self.mix_tx.capacity() == 0
|
||||||
|
&& self.sending_rate_controller.not_increased_delay_recently()
|
||||||
|
{
|
||||||
|
self.sending_rate_controller.increase_delay_multiplier();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Very carefully step up the sending rate in case it seems like we can solidly handle the
|
||||||
|
// current rate.
|
||||||
|
if self.sending_rate_controller.is_sending_reliable() {
|
||||||
|
self.sending_rate_controller.decrease_delay_multiplier();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn poll_poisson(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
|
fn poll_poisson(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
|
||||||
|
// The average delay could change depending on if backpressure in the downstream channel
|
||||||
|
// (mix_tx) was detected.
|
||||||
|
self.adjust_current_average_message_sending_delay();
|
||||||
|
let avg_delay = self.current_average_message_sending_delay();
|
||||||
|
|
||||||
if let Some(ref mut next_delay) = &mut self.next_delay {
|
if let Some(ref mut next_delay) = &mut self.next_delay {
|
||||||
// it is not yet time to return a message
|
// it is not yet time to return a message
|
||||||
if next_delay.as_mut().poll(cx).is_pending() {
|
if next_delay.as_mut().poll(cx).is_pending() {
|
||||||
@@ -243,7 +395,6 @@ where
|
|||||||
|
|
||||||
// we know it's time to send a message, so let's prepare delay for the next one
|
// we know it's time to send a message, so let's prepare delay for the next one
|
||||||
// Get the `now` by looking at the current `delay` deadline
|
// Get the `now` by looking at the current `delay` deadline
|
||||||
let avg_delay = self.config.average_message_sending_delay;
|
|
||||||
let next_poisson_delay = sample_poisson_duration(&mut self.rng, avg_delay);
|
let next_poisson_delay = sample_poisson_duration(&mut self.rng, avg_delay);
|
||||||
|
|
||||||
// The next interval value is `next_poisson_delay` after the one that just
|
// The next interval value is `next_poisson_delay` after the one that just
|
||||||
@@ -266,7 +417,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
// decide what kind of message to send
|
// decide what kind of message to send
|
||||||
match Pin::new(&mut self.real_receiver).poll_next(cx) {
|
match Pin::new(&mut self.real_receiver).poll_recv(cx) {
|
||||||
// in the case our real message channel stream was closed, we should also indicate we are closed
|
// in the case our real message channel stream was closed, we should also indicate we are closed
|
||||||
// (and whoever is using the stream should panic)
|
// (and whoever is using the stream should panic)
|
||||||
Poll::Ready(None) => Poll::Ready(None),
|
Poll::Ready(None) => Poll::Ready(None),
|
||||||
@@ -313,7 +464,7 @@ where
|
|||||||
return Poll::Ready(Some(StreamMessage::Real(Box::new(real_available))));
|
return Poll::Ready(Some(StreamMessage::Real(Box::new(real_available))));
|
||||||
}
|
}
|
||||||
|
|
||||||
match Pin::new(&mut self.real_receiver).poll_next(cx) {
|
match Pin::new(&mut self.real_receiver).poll_recv(cx) {
|
||||||
// in the case our real message channel stream was closed, we should also indicate we are closed
|
// in the case our real message channel stream was closed, we should also indicate we are closed
|
||||||
// (and whoever is using the stream should panic)
|
// (and whoever is using the stream should panic)
|
||||||
Poll::Ready(None) => Poll::Ready(None),
|
Poll::Ready(None) => Poll::Ready(None),
|
||||||
|
|||||||
@@ -208,7 +208,7 @@ impl ReceivedMessagesBuffer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_new_received(&mut self, msgs: Vec<Vec<u8>>) {
|
async fn handle_new_received(&mut self, msgs: Vec<Vec<u8>>) {
|
||||||
debug!(
|
trace!(
|
||||||
"Processing {:?} new message that might get added to the buffer!",
|
"Processing {:?} new message that might get added to the buffer!",
|
||||||
msgs.len()
|
msgs.len()
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -6,9 +6,7 @@ use client_core::client::inbound_messages::{
|
|||||||
InputMessage, InputMessageReceiver, InputMessageSender,
|
InputMessage, InputMessageReceiver, InputMessageSender,
|
||||||
};
|
};
|
||||||
use client_core::client::key_manager::KeyManager;
|
use client_core::client::key_manager::KeyManager;
|
||||||
use client_core::client::mix_traffic::{
|
use client_core::client::mix_traffic::{BatchMixMessageSender, MixTrafficController};
|
||||||
BatchMixMessageReceiver, BatchMixMessageSender, MixTrafficController,
|
|
||||||
};
|
|
||||||
use client_core::client::real_messages_control;
|
use client_core::client::real_messages_control;
|
||||||
use client_core::client::real_messages_control::RealMessagesController;
|
use client_core::client::real_messages_control::RealMessagesController;
|
||||||
use client_core::client::received_buffer::{
|
use client_core::client::received_buffer::{
|
||||||
@@ -263,13 +261,13 @@ impl NymClient {
|
|||||||
// over it. Perhaps GatewayClient needs to be thread-shareable or have some channel for
|
// over it. Perhaps GatewayClient needs to be thread-shareable or have some channel for
|
||||||
// requests?
|
// requests?
|
||||||
fn start_mix_traffic_controller(
|
fn start_mix_traffic_controller(
|
||||||
&mut self,
|
|
||||||
mix_rx: BatchMixMessageReceiver,
|
|
||||||
gateway_client: GatewayClient,
|
gateway_client: GatewayClient,
|
||||||
shutdown: ShutdownListener,
|
shutdown: ShutdownListener,
|
||||||
) {
|
) -> BatchMixMessageSender {
|
||||||
info!("Starting mix traffic controller...");
|
info!("Starting mix traffic controller...");
|
||||||
MixTrafficController::new(mix_rx, gateway_client).start_with_shutdown(shutdown);
|
let (mix_traffic_controller, mix_tx) = MixTrafficController::new(gateway_client);
|
||||||
|
mix_traffic_controller.start_with_shutdown(shutdown);
|
||||||
|
mix_tx
|
||||||
}
|
}
|
||||||
|
|
||||||
fn start_websocket_listener(
|
fn start_websocket_listener(
|
||||||
@@ -285,31 +283,41 @@ impl NymClient {
|
|||||||
websocket::Listener::new(self.config.get_listening_port()).start(websocket_handler);
|
websocket::Listener::new(self.config.get_listening_port()).start(websocket_handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// EXPERIMENTAL DIRECT RUST API
|
// EXPERIMENTAL DIRECT RUST API
|
||||||
/// It's untested and there are absolutely no guarantees about it (but seems to have worked
|
// It's untested and there are absolutely no guarantees about it (but seems to have worked
|
||||||
/// well enough in local tests)
|
// well enough in local tests)
|
||||||
pub fn send_message(&mut self, recipient: Recipient, message: Vec<u8>, with_reply_surb: bool) {
|
//pub async fn send_message(&mut self, recipient: Recipient, message: Vec<u8>, with_reply_surb: bool) {
|
||||||
let input_msg = InputMessage::new_fresh(recipient, message, with_reply_surb);
|
// let input_msg = InputMessage::new_fresh(recipient, message, with_reply_surb);
|
||||||
|
|
||||||
self.input_tx
|
// //self.input_tx
|
||||||
.as_ref()
|
// // .as_ref()
|
||||||
.expect("start method was not called before!")
|
// // .expect("start method was not called before!")
|
||||||
.unbounded_send(input_msg)
|
// // .unbounded_send(input_msg)
|
||||||
.unwrap();
|
// // .unwrap();
|
||||||
}
|
// log::info!("input_tx capacity: {}", self.input_tx.capacity());
|
||||||
|
// if let Err(err) = self
|
||||||
|
// .input_tx
|
||||||
|
// .as_ref()
|
||||||
|
// .expect("start method was not called before!")
|
||||||
|
// .send(input_msg).await
|
||||||
|
// {
|
||||||
|
// log::error!("failed to send");
|
||||||
|
// panic!();
|
||||||
|
// }
|
||||||
|
//}
|
||||||
|
|
||||||
/// EXPERIMENTAL DIRECT RUST API
|
// EXPERIMENTAL DIRECT RUST API
|
||||||
/// It's untested and there are absolutely no guarantees about it (but seems to have worked
|
// It's untested and there are absolutely no guarantees about it (but seems to have worked
|
||||||
/// well enough in local tests)
|
// well enough in local tests)
|
||||||
pub fn send_reply(&mut self, reply_surb: ReplySurb, message: Vec<u8>) {
|
//pub fn send_reply(&mut self, reply_surb: ReplySurb, message: Vec<u8>) {
|
||||||
let input_msg = InputMessage::new_reply(reply_surb, message);
|
// let input_msg = InputMessage::new_reply(reply_surb, message);
|
||||||
|
|
||||||
self.input_tx
|
// self.input_tx
|
||||||
.as_ref()
|
// .as_ref()
|
||||||
.expect("start method was not called before!")
|
// .expect("start method was not called before!")
|
||||||
.unbounded_send(input_msg)
|
// .unbounded_send(input_msg)
|
||||||
.unwrap();
|
// .unwrap();
|
||||||
}
|
//}
|
||||||
|
|
||||||
/// EXPERIMENTAL DIRECT RUST API
|
/// EXPERIMENTAL DIRECT RUST API
|
||||||
/// It's untested and there are absolutely no guarantees about it (but seems to have worked
|
/// It's untested and there are absolutely no guarantees about it (but seems to have worked
|
||||||
@@ -356,11 +364,6 @@ impl NymClient {
|
|||||||
// rather than creating them here, so say for example the buffer controller would create the request channels
|
// rather than creating them here, so say for example the buffer controller would create the request channels
|
||||||
// and would allow anyone to clone the sender channel
|
// and would allow anyone to clone the sender channel
|
||||||
|
|
||||||
// sphinx_message_sender is the transmitter for any component generating sphinx packets that are to be sent to the mixnet
|
|
||||||
// they are used by cover traffic stream and real traffic stream
|
|
||||||
// sphinx_message_receiver is the receiver used by MixTrafficController that sends the actual traffic
|
|
||||||
let (sphinx_message_sender, sphinx_message_receiver) = mpsc::unbounded();
|
|
||||||
|
|
||||||
// unwrapped_sphinx_sender is the transmitter of mixnet messages received from the gateway
|
// unwrapped_sphinx_sender is the transmitter of mixnet messages received from the gateway
|
||||||
// unwrapped_sphinx_receiver is the receiver for said messages - used by ReceivedMessagesBuffer
|
// unwrapped_sphinx_receiver is the receiver for said messages - used by ReceivedMessagesBuffer
|
||||||
let (mixnet_messages_sender, mixnet_messages_receiver) = mpsc::unbounded();
|
let (mixnet_messages_sender, mixnet_messages_receiver) = mpsc::unbounded();
|
||||||
@@ -369,7 +372,8 @@ impl NymClient {
|
|||||||
let (received_buffer_request_sender, received_buffer_request_receiver) = mpsc::unbounded();
|
let (received_buffer_request_sender, received_buffer_request_receiver) = mpsc::unbounded();
|
||||||
|
|
||||||
// channels responsible for controlling real messages
|
// channels responsible for controlling real messages
|
||||||
let (input_sender, input_receiver) = mpsc::unbounded::<InputMessage>();
|
//let (input_sender, input_receiver) = mpsc::unbounded::<InputMessage>();
|
||||||
|
let (input_sender, input_receiver) = tokio::sync::mpsc::channel::<InputMessage>(16);
|
||||||
|
|
||||||
// channels responsible for controlling ack messages
|
// channels responsible for controlling ack messages
|
||||||
let (ack_sender, ack_receiver) = mpsc::unbounded();
|
let (ack_sender, ack_receiver) = mpsc::unbounded();
|
||||||
@@ -397,11 +401,13 @@ impl NymClient {
|
|||||||
.start_gateway_client(mixnet_messages_sender, ack_sender, shutdown.subscribe())
|
.start_gateway_client(mixnet_messages_sender, ack_sender, shutdown.subscribe())
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
self.start_mix_traffic_controller(
|
// The sphinx_message_sender is the transmitter for any component generating sphinx packets
|
||||||
sphinx_message_receiver,
|
// that are to be sent to the mixnet. They are used by cover traffic stream and real
|
||||||
gateway_client,
|
// traffic stream.
|
||||||
shutdown.subscribe(),
|
// The MixTrafficController then sends the actual traffic
|
||||||
);
|
let sphinx_message_sender =
|
||||||
|
Self::start_mix_traffic_controller(gateway_client, shutdown.subscribe());
|
||||||
|
|
||||||
self.start_real_traffic_controller(
|
self.start_real_traffic_controller(
|
||||||
shared_topology_accessor.clone(),
|
shared_topology_accessor.clone(),
|
||||||
reply_key_storage,
|
reply_key_storage,
|
||||||
|
|||||||
@@ -76,7 +76,7 @@ impl Handler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_send(
|
async fn handle_send(
|
||||||
&mut self,
|
&mut self,
|
||||||
recipient: Recipient,
|
recipient: Recipient,
|
||||||
message: Vec<u8>,
|
message: Vec<u8>,
|
||||||
@@ -84,18 +84,31 @@ impl Handler {
|
|||||||
) -> Option<ServerResponse> {
|
) -> Option<ServerResponse> {
|
||||||
// the ack control is now responsible for chunking, etc.
|
// the ack control is now responsible for chunking, etc.
|
||||||
let input_msg = InputMessage::new_fresh(recipient, message, with_reply_surb);
|
let input_msg = InputMessage::new_fresh(recipient, message, with_reply_surb);
|
||||||
self.msg_input.unbounded_send(input_msg).unwrap();
|
log::info!("msg_input capacity: {}", self.msg_input.capacity());
|
||||||
|
if let Err(err) = self.msg_input.send(input_msg).await {
|
||||||
|
log::error!("send failed");
|
||||||
|
panic!();
|
||||||
|
}
|
||||||
|
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_reply(&mut self, reply_surb: ReplySurb, message: Vec<u8>) -> Option<ServerResponse> {
|
async fn handle_reply(
|
||||||
|
&mut self,
|
||||||
|
reply_surb: ReplySurb,
|
||||||
|
message: Vec<u8>,
|
||||||
|
) -> Option<ServerResponse> {
|
||||||
if message.len() > ReplySurb::max_msg_len(Default::default()) {
|
if message.len() > ReplySurb::max_msg_len(Default::default()) {
|
||||||
return Some(ServerResponse::new_error(format!("too long message to put inside a reply SURB. Received: {} bytes and maximum is {} bytes", message.len(), ReplySurb::max_msg_len(Default::default()))));
|
return Some(ServerResponse::new_error(format!("too long message to put inside a reply SURB. Received: {} bytes and maximum is {} bytes", message.len(), ReplySurb::max_msg_len(Default::default()))));
|
||||||
}
|
}
|
||||||
|
|
||||||
let input_msg = InputMessage::new_reply(reply_surb, message);
|
let input_msg = InputMessage::new_reply(reply_surb, message);
|
||||||
self.msg_input.unbounded_send(input_msg).unwrap();
|
//self.msg_input.unbounded_send(input_msg).unwrap();
|
||||||
|
log::info!("msg_input capacity: {}", self.msg_input.capacity());
|
||||||
|
if let Err(err) = self.msg_input.send(input_msg).await {
|
||||||
|
log::error!("send failed");
|
||||||
|
panic!();
|
||||||
|
}
|
||||||
|
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
@@ -104,22 +117,22 @@ impl Handler {
|
|||||||
ServerResponse::SelfAddress(self.self_full_address)
|
ServerResponse::SelfAddress(self.self_full_address)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_request(&mut self, request: ClientRequest) -> Option<ServerResponse> {
|
async fn handle_request(&mut self, request: ClientRequest) -> Option<ServerResponse> {
|
||||||
match request {
|
match request {
|
||||||
ClientRequest::Send {
|
ClientRequest::Send {
|
||||||
recipient,
|
recipient,
|
||||||
message,
|
message,
|
||||||
with_reply_surb,
|
with_reply_surb,
|
||||||
} => self.handle_send(recipient, message, with_reply_surb),
|
} => self.handle_send(recipient, message, with_reply_surb).await,
|
||||||
ClientRequest::Reply {
|
ClientRequest::Reply {
|
||||||
message,
|
message,
|
||||||
reply_surb,
|
reply_surb,
|
||||||
} => self.handle_reply(reply_surb, message),
|
} => self.handle_reply(reply_surb, message).await,
|
||||||
ClientRequest::SelfAddress => Some(self.handle_self_address()),
|
ClientRequest::SelfAddress => Some(self.handle_self_address()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_text_message(&mut self, msg: String) -> Option<WsMessage> {
|
async fn handle_text_message(&mut self, msg: String) -> Option<WsMessage> {
|
||||||
debug!("Handling text message request");
|
debug!("Handling text message request");
|
||||||
trace!("Content: {:?}", msg);
|
trace!("Content: {:?}", msg);
|
||||||
|
|
||||||
@@ -128,13 +141,13 @@ impl Handler {
|
|||||||
|
|
||||||
let response = match client_request {
|
let response = match client_request {
|
||||||
Err(err) => Some(ServerResponse::Error(err)),
|
Err(err) => Some(ServerResponse::Error(err)),
|
||||||
Ok(req) => self.handle_request(req),
|
Ok(req) => self.handle_request(req).await,
|
||||||
};
|
};
|
||||||
|
|
||||||
response.map(|resp| WsMessage::text(resp.into_text()))
|
response.map(|resp| WsMessage::text(resp.into_text()))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_binary_message(&mut self, msg: Vec<u8>) -> Option<WsMessage> {
|
async fn handle_binary_message(&mut self, msg: Vec<u8>) -> Option<WsMessage> {
|
||||||
debug!("Handling binary message request");
|
debug!("Handling binary message request");
|
||||||
|
|
||||||
self.received_response_type = ReceivedResponseType::Binary;
|
self.received_response_type = ReceivedResponseType::Binary;
|
||||||
@@ -142,19 +155,19 @@ impl Handler {
|
|||||||
|
|
||||||
let response = match client_request {
|
let response = match client_request {
|
||||||
Err(err) => Some(ServerResponse::Error(err)),
|
Err(err) => Some(ServerResponse::Error(err)),
|
||||||
Ok(req) => self.handle_request(req),
|
Ok(req) => self.handle_request(req).await,
|
||||||
};
|
};
|
||||||
|
|
||||||
response.map(|resp| WsMessage::Binary(resp.into_binary()))
|
response.map(|resp| WsMessage::Binary(resp.into_binary()))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_ws_request(&mut self, raw_request: WsMessage) -> Option<WsMessage> {
|
async fn handle_ws_request(&mut self, raw_request: WsMessage) -> Option<WsMessage> {
|
||||||
// apparently tungstenite auto-handles ping/pong/close messages so for now let's ignore
|
// apparently tungstenite auto-handles ping/pong/close messages so for now let's ignore
|
||||||
// them and let's test that claim. If that's not the case, just copy code from
|
// them and let's test that claim. If that's not the case, just copy code from
|
||||||
// old version of this file.
|
// old version of this file.
|
||||||
match raw_request {
|
match raw_request {
|
||||||
WsMessage::Text(text_message) => self.handle_text_message(text_message),
|
WsMessage::Text(text_message) => self.handle_text_message(text_message).await,
|
||||||
WsMessage::Binary(binary_message) => self.handle_binary_message(binary_message),
|
WsMessage::Binary(binary_message) => self.handle_binary_message(binary_message).await,
|
||||||
_ => None,
|
_ => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -244,7 +257,7 @@ impl Handler {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(response) = self.handle_ws_request(socket_msg) {
|
if let Some(response) = self.handle_ws_request(socket_msg).await {
|
||||||
if let Err(err) = self.send_websocket_response(response).await {
|
if let Err(err) = self.send_websocket_response(response).await {
|
||||||
warn!(
|
warn!(
|
||||||
"Failed to send message over websocket: {}. Assuming the connection is dead.",
|
"Failed to send message over websocket: {}. Assuming the connection is dead.",
|
||||||
|
|||||||
@@ -13,9 +13,7 @@ use client_core::client::inbound_messages::{
|
|||||||
InputMessage, InputMessageReceiver, InputMessageSender,
|
InputMessage, InputMessageReceiver, InputMessageSender,
|
||||||
};
|
};
|
||||||
use client_core::client::key_manager::KeyManager;
|
use client_core::client::key_manager::KeyManager;
|
||||||
use client_core::client::mix_traffic::{
|
use client_core::client::mix_traffic::{BatchMixMessageSender, MixTrafficController};
|
||||||
BatchMixMessageReceiver, BatchMixMessageSender, MixTrafficController,
|
|
||||||
};
|
|
||||||
use client_core::client::real_messages_control::RealMessagesController;
|
use client_core::client::real_messages_control::RealMessagesController;
|
||||||
use client_core::client::received_buffer::{
|
use client_core::client::received_buffer::{
|
||||||
ReceivedBufferRequestReceiver, ReceivedBufferRequestSender, ReceivedMessagesBufferController,
|
ReceivedBufferRequestReceiver, ReceivedBufferRequestSender, ReceivedMessagesBufferController,
|
||||||
@@ -263,13 +261,13 @@ impl NymClient {
|
|||||||
// over it. Perhaps GatewayClient needs to be thread-shareable or have some channel for
|
// over it. Perhaps GatewayClient needs to be thread-shareable or have some channel for
|
||||||
// requests?
|
// requests?
|
||||||
fn start_mix_traffic_controller(
|
fn start_mix_traffic_controller(
|
||||||
&mut self,
|
|
||||||
mix_rx: BatchMixMessageReceiver,
|
|
||||||
gateway_client: GatewayClient,
|
gateway_client: GatewayClient,
|
||||||
shutdown: ShutdownListener,
|
shutdown: ShutdownListener,
|
||||||
) {
|
) -> BatchMixMessageSender {
|
||||||
info!("Starting mix traffic controller...");
|
info!("Starting mix traffic controller...");
|
||||||
MixTrafficController::new(mix_rx, gateway_client).start_with_shutdown(shutdown);
|
let (mix_traffic_controller, mix_tx) = MixTrafficController::new(gateway_client);
|
||||||
|
mix_traffic_controller.start_with_shutdown(shutdown);
|
||||||
|
mix_tx
|
||||||
}
|
}
|
||||||
|
|
||||||
fn start_socks5_listener(
|
fn start_socks5_listener(
|
||||||
@@ -345,11 +343,6 @@ impl NymClient {
|
|||||||
// rather than creating them here, so say for example the buffer controller would create the request channels
|
// rather than creating them here, so say for example the buffer controller would create the request channels
|
||||||
// and would allow anyone to clone the sender channel
|
// and would allow anyone to clone the sender channel
|
||||||
|
|
||||||
// sphinx_message_sender is the transmitter for any component generating sphinx packets that are to be sent to the mixnet
|
|
||||||
// they are used by cover traffic stream and real traffic stream
|
|
||||||
// sphinx_message_receiver is the receiver used by MixTrafficController that sends the actual traffic
|
|
||||||
let (sphinx_message_sender, sphinx_message_receiver) = mpsc::unbounded();
|
|
||||||
|
|
||||||
// unwrapped_sphinx_sender is the transmitter of mixnet messages received from the gateway
|
// unwrapped_sphinx_sender is the transmitter of mixnet messages received from the gateway
|
||||||
// unwrapped_sphinx_receiver is the receiver for said messages - used by ReceivedMessagesBuffer
|
// unwrapped_sphinx_receiver is the receiver for said messages - used by ReceivedMessagesBuffer
|
||||||
let (mixnet_messages_sender, mixnet_messages_receiver) = mpsc::unbounded();
|
let (mixnet_messages_sender, mixnet_messages_receiver) = mpsc::unbounded();
|
||||||
@@ -358,7 +351,7 @@ impl NymClient {
|
|||||||
let (received_buffer_request_sender, received_buffer_request_receiver) = mpsc::unbounded();
|
let (received_buffer_request_sender, received_buffer_request_receiver) = mpsc::unbounded();
|
||||||
|
|
||||||
// channels responsible for controlling real messages
|
// channels responsible for controlling real messages
|
||||||
let (input_sender, input_receiver) = mpsc::unbounded::<InputMessage>();
|
let (input_sender, input_receiver) = tokio::sync::mpsc::channel::<InputMessage>(16);
|
||||||
|
|
||||||
// channels responsible for controlling ack messages
|
// channels responsible for controlling ack messages
|
||||||
let (ack_sender, ack_receiver) = mpsc::unbounded();
|
let (ack_sender, ack_receiver) = mpsc::unbounded();
|
||||||
@@ -386,11 +379,13 @@ impl NymClient {
|
|||||||
.start_gateway_client(mixnet_messages_sender, ack_sender, shutdown.subscribe())
|
.start_gateway_client(mixnet_messages_sender, ack_sender, shutdown.subscribe())
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
self.start_mix_traffic_controller(
|
// The sphinx_message_sender is the transmitter for any component generating sphinx packets
|
||||||
sphinx_message_receiver,
|
// that are to be sent to the mixnet. They are used by cover traffic stream and real
|
||||||
gateway_client,
|
// traffic stream.
|
||||||
shutdown.subscribe(),
|
// The MixTrafficController then sends the actual traffic
|
||||||
);
|
let sphinx_message_sender =
|
||||||
|
Self::start_mix_traffic_controller(gateway_client, shutdown.subscribe());
|
||||||
|
|
||||||
self.start_real_traffic_controller(
|
self.start_real_traffic_controller(
|
||||||
shared_topology_accessor.clone(),
|
shared_topology_accessor.clone(),
|
||||||
reply_key_storage,
|
reply_key_storage,
|
||||||
|
|||||||
@@ -231,7 +231,12 @@ impl SocksClient {
|
|||||||
let msg = Message::Request(req);
|
let msg = Message::Request(req);
|
||||||
|
|
||||||
let input_message = InputMessage::new_fresh(self.service_provider, msg.into_bytes(), false);
|
let input_message = InputMessage::new_fresh(self.service_provider, msg.into_bytes(), false);
|
||||||
self.input_sender.unbounded_send(input_message).unwrap();
|
//self.input_sender.unbounded_send(input_message).unwrap();
|
||||||
|
log::info!("input_sender capacity: {}", self.input_sender.capacity());
|
||||||
|
if let Err(err) = self.input_sender.send(input_message).await {
|
||||||
|
log::info!("failed to send");
|
||||||
|
panic!();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run_proxy(&mut self, conn_receiver: ConnectionReceiver, remote_proxy_target: String) {
|
async fn run_proxy(&mut self, conn_receiver: ConnectionReceiver, remote_proxy_target: String) {
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ use client_core::client::{
|
|||||||
cover_traffic_stream::LoopCoverTrafficStream,
|
cover_traffic_stream::LoopCoverTrafficStream,
|
||||||
inbound_messages::{InputMessage, InputMessageReceiver, InputMessageSender},
|
inbound_messages::{InputMessage, InputMessageReceiver, InputMessageSender},
|
||||||
key_manager::KeyManager,
|
key_manager::KeyManager,
|
||||||
mix_traffic::{BatchMixMessageReceiver, BatchMixMessageSender, MixTrafficController},
|
mix_traffic::{BatchMixMessageSender, MixTrafficController},
|
||||||
real_messages_control::{self, RealMessagesController},
|
real_messages_control::{self, RealMessagesController},
|
||||||
received_buffer::{
|
received_buffer::{
|
||||||
ReceivedBufferMessage, ReceivedBufferRequestReceiver, ReceivedBufferRequestSender,
|
ReceivedBufferMessage, ReceivedBufferRequestReceiver, ReceivedBufferRequestSender,
|
||||||
@@ -253,13 +253,11 @@ impl NymClient {
|
|||||||
// TODO: if we want to send control messages to gateway_client, this CAN'T take the ownership
|
// TODO: if we want to send control messages to gateway_client, this CAN'T take the ownership
|
||||||
// over it. Perhaps GatewayClient needs to be thread-shareable or have some channel for
|
// over it. Perhaps GatewayClient needs to be thread-shareable or have some channel for
|
||||||
// requests?
|
// requests?
|
||||||
fn start_mix_traffic_controller(
|
fn start_mix_traffic_controller(gateway_client: GatewayClient) -> BatchMixMessageSender {
|
||||||
&mut self,
|
|
||||||
mix_rx: BatchMixMessageReceiver,
|
|
||||||
gateway_client: GatewayClient,
|
|
||||||
) {
|
|
||||||
console_log!("Starting mix traffic controller...");
|
console_log!("Starting mix traffic controller...");
|
||||||
MixTrafficController::new(mix_rx, gateway_client).start();
|
let (mix_traffic_controller, mix_tx) = MixTrafficController::new(gateway_client);
|
||||||
|
mix_traffic_controller.start();
|
||||||
|
mix_tx
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: this procedure is extremely overcomplicated, because it's based off native client's behaviour
|
// TODO: this procedure is extremely overcomplicated, because it's based off native client's behaviour
|
||||||
@@ -307,11 +305,6 @@ impl NymClient {
|
|||||||
// rather than creating them here, so say for example the buffer controller would create the request channels
|
// rather than creating them here, so say for example the buffer controller would create the request channels
|
||||||
// and would allow anyone to clone the sender channel
|
// and would allow anyone to clone the sender channel
|
||||||
|
|
||||||
// sphinx_message_sender is the transmitter for any component generating sphinx packets that are to be sent to the mixnet
|
|
||||||
// they are used by cover traffic stream and real traffic stream
|
|
||||||
// sphinx_message_receiver is the receiver used by MixTrafficController that sends the actual traffic
|
|
||||||
let (sphinx_message_sender, sphinx_message_receiver) = mpsc::unbounded();
|
|
||||||
|
|
||||||
// unwrapped_sphinx_sender is the transmitter of mixnet messages received from the gateway
|
// unwrapped_sphinx_sender is the transmitter of mixnet messages received from the gateway
|
||||||
// unwrapped_sphinx_receiver is the receiver for said messages - used by ReceivedMessagesBuffer
|
// unwrapped_sphinx_receiver is the receiver for said messages - used by ReceivedMessagesBuffer
|
||||||
let (mixnet_messages_sender, mixnet_messages_receiver) = mpsc::unbounded();
|
let (mixnet_messages_sender, mixnet_messages_receiver) = mpsc::unbounded();
|
||||||
@@ -339,7 +332,12 @@ impl NymClient {
|
|||||||
.start_gateway_client(mixnet_messages_sender, ack_sender)
|
.start_gateway_client(mixnet_messages_sender, ack_sender)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
self.start_mix_traffic_controller(sphinx_message_receiver, gateway_client);
|
// The sphinx_message_sender is the transmitter for any component generating sphinx packets
|
||||||
|
// that are to be sent to the mixnet. They are used by cover traffic stream and real
|
||||||
|
// traffic stream.
|
||||||
|
// The MixTrafficController then sends the actual traffic
|
||||||
|
let sphinx_message_sender = Self::start_mix_traffic_controller(gateway_client);
|
||||||
|
|
||||||
self.start_real_traffic_controller(
|
self.start_real_traffic_controller(
|
||||||
shared_topology_accessor.clone(),
|
shared_topology_accessor.clone(),
|
||||||
ack_receiver,
|
ack_receiver,
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ use task::ShutdownListener;
|
|||||||
use tokio::select;
|
use tokio::select;
|
||||||
use tokio::{net::tcp::OwnedReadHalf, sync::Notify, time::sleep};
|
use tokio::{net::tcp::OwnedReadHalf, sync::Notify, time::sleep};
|
||||||
|
|
||||||
fn send_empty_close<F, S>(
|
async fn send_empty_close<F, S>(
|
||||||
connection_id: ConnectionId,
|
connection_id: ConnectionId,
|
||||||
message_sender: &mut OrderedMessageSender,
|
message_sender: &mut OrderedMessageSender,
|
||||||
mix_sender: &MixProxySender<S>,
|
mix_sender: &MixProxySender<S>,
|
||||||
@@ -24,12 +24,20 @@ fn send_empty_close<F, S>(
|
|||||||
F: Fn(ConnectionId, Vec<u8>, bool) -> S,
|
F: Fn(ConnectionId, Vec<u8>, bool) -> S,
|
||||||
{
|
{
|
||||||
let ordered_msg = message_sender.wrap_message(Vec::new()).into_bytes();
|
let ordered_msg = message_sender.wrap_message(Vec::new()).into_bytes();
|
||||||
mix_sender
|
//mix_sender
|
||||||
.unbounded_send(adapter_fn(connection_id, ordered_msg, true))
|
// .unbounded_send(adapter_fn(connection_id, ordered_msg, true))
|
||||||
.unwrap();
|
// .unwrap();
|
||||||
|
log::info!("mix_sender capacity: {}", mix_sender.capacity());
|
||||||
|
if let Err(err) = mix_sender
|
||||||
|
.send(adapter_fn(connection_id, ordered_msg, true))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
log::error!("failed to send");
|
||||||
|
panic!();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn deal_with_data<F, S>(
|
async fn deal_with_data<F, S>(
|
||||||
read_data: Option<io::Result<Bytes>>,
|
read_data: Option<io::Result<Bytes>>,
|
||||||
local_destination_address: &str,
|
local_destination_address: &str,
|
||||||
remote_source_address: &str,
|
remote_source_address: &str,
|
||||||
@@ -63,9 +71,17 @@ where
|
|||||||
|
|
||||||
// if we're sending through the mixnet increase the sequence number...
|
// if we're sending through the mixnet increase the sequence number...
|
||||||
let ordered_msg = message_sender.wrap_message(read_data.to_vec()).into_bytes();
|
let ordered_msg = message_sender.wrap_message(read_data.to_vec()).into_bytes();
|
||||||
mix_sender
|
//mix_sender
|
||||||
.unbounded_send(adapter_fn(connection_id, ordered_msg, is_finished))
|
// .unbounded_send(adapter_fn(connection_id, ordered_msg, is_finished))
|
||||||
.unwrap();
|
// .unwrap();
|
||||||
|
log::info!("mix_sender capacity: {}", mix_sender.capacity());
|
||||||
|
if let Err(err) = mix_sender
|
||||||
|
.send(adapter_fn(connection_id, ordered_msg, is_finished))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
log::error!("failed to send");
|
||||||
|
panic!();
|
||||||
|
}
|
||||||
|
|
||||||
if is_finished {
|
if is_finished {
|
||||||
// technically we already informed it when we sent the message to mixnet above
|
// technically we already informed it when we sent the message to mixnet above
|
||||||
@@ -98,7 +114,7 @@ where
|
|||||||
loop {
|
loop {
|
||||||
select! {
|
select! {
|
||||||
read_data = &mut available_reader.next() => {
|
read_data = &mut available_reader.next() => {
|
||||||
if deal_with_data(read_data, &local_destination_address, &remote_source_address, connection_id, &mut message_sender, &mix_sender, &adapter_fn) {
|
if deal_with_data(read_data, &local_destination_address, &remote_source_address, connection_id, &mut message_sender, &mix_sender, &adapter_fn).await {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -29,7 +29,8 @@ impl From<(Vec<u8>, bool)> for ProxyMessage {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type MixProxySender<S> = mpsc::UnboundedSender<S>;
|
//pub type MixProxySender<S> = mpsc::UnboundedSender<S>;
|
||||||
|
pub type MixProxySender<S> = tokio::sync::mpsc::Sender<S>;
|
||||||
|
|
||||||
// TODO: when we finally get to implementing graceful shutdown,
|
// TODO: when we finally get to implementing graceful shutdown,
|
||||||
// on Drop this guy should tell the remote that it's closed now
|
// on Drop this guy should tell the remote that it's closed now
|
||||||
@@ -78,7 +79,7 @@ where
|
|||||||
// request/response as required by entity running particular side of the proxy.
|
// request/response as required by entity running particular side of the proxy.
|
||||||
pub async fn run<F>(mut self, adapter_fn: F) -> Self
|
pub async fn run<F>(mut self, adapter_fn: F) -> Self
|
||||||
where
|
where
|
||||||
F: Fn(ConnectionId, Vec<u8>, bool) -> S + Send + 'static,
|
F: Fn(ConnectionId, Vec<u8>, bool) -> S + Send + 'static + std::marker::Sync,
|
||||||
{
|
{
|
||||||
let (read_half, write_half) = self.socket.take().unwrap().into_split();
|
let (read_half, write_half) = self.socket.take().unwrap().into_split();
|
||||||
let shutdown_notify = Arc::new(Notify::new());
|
let shutdown_notify = Arc::new(Notify::new());
|
||||||
|
|||||||
@@ -40,7 +40,8 @@ impl Connection {
|
|||||||
pub(crate) async fn run_proxy(
|
pub(crate) async fn run_proxy(
|
||||||
&mut self,
|
&mut self,
|
||||||
mix_receiver: ConnectionReceiver,
|
mix_receiver: ConnectionReceiver,
|
||||||
mix_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
//mix_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
||||||
|
mix_sender: tokio::sync::mpsc::Sender<(Socks5Message, Recipient)>,
|
||||||
shutdown: ShutdownListener,
|
shutdown: ShutdownListener,
|
||||||
) {
|
) {
|
||||||
let stream = self.conn.take().unwrap();
|
let stream = self.conn.take().unwrap();
|
||||||
|
|||||||
@@ -66,11 +66,12 @@ impl ServiceProvider {
|
|||||||
/// via the `websocket_writer`.
|
/// via the `websocket_writer`.
|
||||||
async fn mixnet_response_listener(
|
async fn mixnet_response_listener(
|
||||||
mut websocket_writer: SplitSink<TSWebsocketStream, Message>,
|
mut websocket_writer: SplitSink<TSWebsocketStream, Message>,
|
||||||
mut mix_reader: mpsc::UnboundedReceiver<(Socks5Message, Recipient)>,
|
//mut mix_reader: mpsc::UnboundedReceiver<(Socks5Message, Recipient)>,
|
||||||
|
mut mix_reader: tokio::sync::mpsc::Receiver<(Socks5Message, Recipient)>,
|
||||||
stats_collector: Option<ServiceStatisticsCollector>,
|
stats_collector: Option<ServiceStatisticsCollector>,
|
||||||
) {
|
) {
|
||||||
// TODO: wire SURBs in here once they're available
|
// TODO: wire SURBs in here once they're available
|
||||||
while let Some((msg, return_address)) = mix_reader.next().await {
|
while let Some((msg, return_address)) = mix_reader.recv().await {
|
||||||
if let Some(stats_collector) = stats_collector.as_ref() {
|
if let Some(stats_collector) = stats_collector.as_ref() {
|
||||||
if let Some(remote_addr) = stats_collector
|
if let Some(remote_addr) = stats_collector
|
||||||
.connected_services
|
.connected_services
|
||||||
@@ -134,7 +135,8 @@ impl ServiceProvider {
|
|||||||
remote_addr: String,
|
remote_addr: String,
|
||||||
return_address: Recipient,
|
return_address: Recipient,
|
||||||
controller_sender: ControllerSender,
|
controller_sender: ControllerSender,
|
||||||
mix_input_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
//mix_input_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
||||||
|
mix_input_sender: tokio::sync::mpsc::Sender<(Socks5Message, Recipient)>,
|
||||||
shutdown: ShutdownListener,
|
shutdown: ShutdownListener,
|
||||||
) {
|
) {
|
||||||
let mut conn = match Connection::new(conn_id, remote_addr.clone(), return_address).await {
|
let mut conn = match Connection::new(conn_id, remote_addr.clone(), return_address).await {
|
||||||
@@ -147,12 +149,23 @@ impl ServiceProvider {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// inform the remote that the connection is closed before it even was established
|
// inform the remote that the connection is closed before it even was established
|
||||||
mix_input_sender
|
//mix_input_sender
|
||||||
.unbounded_send((
|
// .unbounded_send((
|
||||||
|
// Socks5Message::Response(Response::new(conn_id, Vec::new(), true)),
|
||||||
|
// return_address,
|
||||||
|
// ))
|
||||||
|
// .unwrap();
|
||||||
|
log::info!("mix_input_sender capacity: {}", mix_input_sender.capacity());
|
||||||
|
if let Err(err) = mix_input_sender
|
||||||
|
.send((
|
||||||
Socks5Message::Response(Response::new(conn_id, Vec::new(), true)),
|
Socks5Message::Response(Response::new(conn_id, Vec::new(), true)),
|
||||||
return_address,
|
return_address,
|
||||||
))
|
))
|
||||||
.unwrap();
|
.await
|
||||||
|
{
|
||||||
|
log::error!("failed to send");
|
||||||
|
panic!();
|
||||||
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -188,10 +201,11 @@ impl ServiceProvider {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_proxy_connect(
|
async fn handle_proxy_connect(
|
||||||
&mut self,
|
&mut self,
|
||||||
controller_sender: &mut ControllerSender,
|
controller_sender: &mut ControllerSender,
|
||||||
mix_input_sender: &mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
//mix_input_sender: &mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
||||||
|
mix_input_sender: &tokio::sync::mpsc::Sender<(Socks5Message, Recipient)>,
|
||||||
conn_id: ConnectionId,
|
conn_id: ConnectionId,
|
||||||
remote_addr: String,
|
remote_addr: String,
|
||||||
return_address: Recipient,
|
return_address: Recipient,
|
||||||
@@ -200,14 +214,27 @@ impl ServiceProvider {
|
|||||||
if !self.open_proxy && !self.outbound_request_filter.check(&remote_addr) {
|
if !self.open_proxy && !self.outbound_request_filter.check(&remote_addr) {
|
||||||
let log_msg = format!("Domain {:?} failed filter check", remote_addr);
|
let log_msg = format!("Domain {:?} failed filter check", remote_addr);
|
||||||
log::info!("{}", log_msg);
|
log::info!("{}", log_msg);
|
||||||
mix_input_sender
|
//mix_input_sender
|
||||||
.unbounded_send((
|
// .unbounded_send((
|
||||||
|
// Socks5Message::NetworkRequesterResponse(NetworkRequesterResponse::new(
|
||||||
|
// conn_id, log_msg,
|
||||||
|
// )),
|
||||||
|
// return_address,
|
||||||
|
// ))
|
||||||
|
// .unwrap();
|
||||||
|
log::info!("mix_input_sender capacity: {}", mix_input_sender.capacity());
|
||||||
|
if let Err(err) = mix_input_sender
|
||||||
|
.send((
|
||||||
Socks5Message::NetworkRequesterResponse(NetworkRequesterResponse::new(
|
Socks5Message::NetworkRequesterResponse(NetworkRequesterResponse::new(
|
||||||
conn_id, log_msg,
|
conn_id, log_msg,
|
||||||
)),
|
)),
|
||||||
return_address,
|
return_address,
|
||||||
))
|
))
|
||||||
.unwrap();
|
.await
|
||||||
|
{
|
||||||
|
log::error!("failed to send");
|
||||||
|
panic!();
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -244,7 +271,8 @@ impl ServiceProvider {
|
|||||||
&mut self,
|
&mut self,
|
||||||
raw_request: &[u8],
|
raw_request: &[u8],
|
||||||
controller_sender: &mut ControllerSender,
|
controller_sender: &mut ControllerSender,
|
||||||
mix_input_sender: &mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
//mix_input_sender: &mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
||||||
|
mix_input_sender: &tokio::sync::mpsc::Sender<(Socks5Message, Recipient)>,
|
||||||
stats_collector: Option<ServiceStatisticsCollector>,
|
stats_collector: Option<ServiceStatisticsCollector>,
|
||||||
shutdown: ShutdownListener,
|
shutdown: ShutdownListener,
|
||||||
) {
|
) {
|
||||||
@@ -273,6 +301,7 @@ impl ServiceProvider {
|
|||||||
req.return_address,
|
req.return_address,
|
||||||
shutdown,
|
shutdown,
|
||||||
)
|
)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
Request::Send(conn_id, data, closed) => {
|
Request::Send(conn_id, data, closed) => {
|
||||||
@@ -307,7 +336,8 @@ impl ServiceProvider {
|
|||||||
// channels responsible for managing messages that are to be sent to the mix network. The receiver is
|
// channels responsible for managing messages that are to be sent to the mix network. The receiver is
|
||||||
// going to be used by `mixnet_response_listener`
|
// going to be used by `mixnet_response_listener`
|
||||||
let (mix_input_sender, mix_input_receiver) =
|
let (mix_input_sender, mix_input_receiver) =
|
||||||
mpsc::unbounded::<(Socks5Message, Recipient)>();
|
tokio::sync::mpsc::channel::<(Socks5Message, Recipient)>(16);
|
||||||
|
//mpsc::unbounded::<(Socks5Message, Recipient)>();
|
||||||
|
|
||||||
// Controller for managing all active connections.
|
// Controller for managing all active connections.
|
||||||
// We provide it with a ShutdownListener since it requires it, even though for the network
|
// We provide it with a ShutdownListener since it requires it, even though for the network
|
||||||
|
|||||||
@@ -77,13 +77,15 @@ pub struct ServiceStatisticsCollector {
|
|||||||
pub(crate) response_stats_data: Arc<RwLock<StatsData>>,
|
pub(crate) response_stats_data: Arc<RwLock<StatsData>>,
|
||||||
pub(crate) connected_services: Arc<RwLock<HashMap<ConnectionId, RemoteAddress>>>,
|
pub(crate) connected_services: Arc<RwLock<HashMap<ConnectionId, RemoteAddress>>>,
|
||||||
stats_provider_addr: Recipient,
|
stats_provider_addr: Recipient,
|
||||||
mix_input_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
//mix_input_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
||||||
|
mix_input_sender: tokio::sync::mpsc::Sender<(Socks5Message, Recipient)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ServiceStatisticsCollector {
|
impl ServiceStatisticsCollector {
|
||||||
pub async fn new(
|
pub async fn new(
|
||||||
stats_provider_addr: Option<Recipient>,
|
stats_provider_addr: Option<Recipient>,
|
||||||
mix_input_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
mix_input_sender: tokio::sync::mpsc::Sender<(Socks5Message, Recipient)>,
|
||||||
|
//mix_input_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
||||||
) -> Result<Self, StatsError> {
|
) -> Result<Self, StatsError> {
|
||||||
let client = reqwest::Client::builder()
|
let client = reqwest::Client::builder()
|
||||||
.timeout(Duration::from_secs(3))
|
.timeout(Duration::from_secs(3))
|
||||||
@@ -175,20 +177,50 @@ impl StatisticsCollector for ServiceStatisticsCollector {
|
|||||||
),
|
),
|
||||||
self.stats_provider_addr,
|
self.stats_provider_addr,
|
||||||
);
|
);
|
||||||
self.mix_input_sender
|
//self.mix_input_sender
|
||||||
.unbounded_send((
|
// .unbounded_send((
|
||||||
|
// Socks5Message::Request(connect_req),
|
||||||
|
// self.stats_provider_addr,
|
||||||
|
// ))
|
||||||
|
// .unwrap();
|
||||||
|
log::info!(
|
||||||
|
"mix_input_sender capacity: {}",
|
||||||
|
self.mix_input_sender.capacity()
|
||||||
|
);
|
||||||
|
if let Err(err) = self
|
||||||
|
.mix_input_sender
|
||||||
|
.send((
|
||||||
Socks5Message::Request(connect_req),
|
Socks5Message::Request(connect_req),
|
||||||
self.stats_provider_addr,
|
self.stats_provider_addr,
|
||||||
))
|
))
|
||||||
.unwrap();
|
.await
|
||||||
|
{
|
||||||
|
log::error!("failed to send");
|
||||||
|
panic!();
|
||||||
|
}
|
||||||
|
|
||||||
trace!("Sending data to statistics service");
|
trace!("Sending data to statistics service");
|
||||||
let mut message_sender = OrderedMessageSender::new();
|
let mut message_sender = OrderedMessageSender::new();
|
||||||
let ordered_msg = message_sender.wrap_message(msg).into_bytes();
|
let ordered_msg = message_sender.wrap_message(msg).into_bytes();
|
||||||
let send_req = Request::new_send(conn_id, ordered_msg, true);
|
let send_req = Request::new_send(conn_id, ordered_msg, true);
|
||||||
self.mix_input_sender
|
//self.mix_input_sender
|
||||||
.unbounded_send((Socks5Message::Request(send_req), self.stats_provider_addr))
|
// .unbounded_send((Socks5Message::Request(send_req), self.stats_provider_addr))
|
||||||
.unwrap();
|
// .unwrap();
|
||||||
|
log::info!(
|
||||||
|
"mix_input_sender capacity: {}",
|
||||||
|
self.mix_input_sender.capacity()
|
||||||
|
);
|
||||||
|
if let Err(err) = self
|
||||||
|
.mix_input_sender
|
||||||
|
.send((
|
||||||
|
Socks5Message::Request(send_req),
|
||||||
|
self.stats_provider_addr,
|
||||||
|
))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
log::error!("failed to send");
|
||||||
|
panic!();
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user