Compare commits

...

5 Commits

Author SHA1 Message Date
Jon Häggblad 12886852cc WIP: backpressure using tokio Sender 2022-10-19 11:39:03 +02:00
Jon Häggblad 7058e7139d WIP: backpressure in out queue control 2022-10-19 00:15:19 +02:00
Jon Häggblad 73f3552b52 WIP: bounded channels and console subscriber 2022-10-17 16:00:42 +02:00
Jon Häggblad 54a3a5b5d8 WIP 2022-10-17 12:18:03 +02:00
Jon Häggblad c0044274f7 WIP 2022-10-17 12:18:03 +02:00
23 changed files with 400 additions and 147 deletions
Generated
+124 -104
View File
@@ -193,6 +193,51 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "axum"
version = "0.5.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9e3356844c4d6a6d6467b8da2cffb4a2820be256f50a3a386c9d152bab31043"
dependencies = [
"async-trait",
"axum-core",
"bitflags",
"bytes",
"futures-util",
"http",
"http-body",
"hyper",
"itoa 1.0.1",
"matchit",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"serde",
"sync_wrapper",
"tokio",
"tower",
"tower-http",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9f0c0a60006f2a293d82d571f635042a72edf927539b7685bd62d361963839b"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"http",
"http-body",
"mime",
"tower-layer",
"tower-service",
]
[[package]]
name = "bandwidth-claim-contract"
version = "0.1.0"
@@ -685,22 +730,21 @@ dependencies = [
[[package]]
name = "console-api"
version = "0.1.2"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc347c19eb5b940f396ac155822caee6662f850d97306890ac3773ed76c90c5a"
checksum = "e57ff02e8ad8e06ab9731d5dc72dc23bef9200778eae1a89d555d8c42e5d4a86"
dependencies = [
"prost 0.9.0",
"prost-types 0.9.0",
"prost 0.11.0",
"prost-types 0.11.1",
"tonic",
"tonic-build",
"tracing-core",
]
[[package]]
name = "console-subscriber"
version = "0.1.3"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "565a7dfea2d10dd0e5c57cc394d5d441b1910960d8c9211ed14135e0e6ec3a20"
checksum = "22a3a81dfaf6b66bce5d159eddae701e3a002f194d378cbf7be5f053c281d9be"
dependencies = [
"console-api",
"crossbeam-channel",
@@ -708,7 +752,7 @@ dependencies = [
"futures",
"hdrhistogram",
"humantime 2.1.0",
"prost-types 0.9.0",
"prost-types 0.11.1",
"serde",
"serde_json",
"thread_local",
@@ -1664,12 +1708,6 @@ dependencies = [
"version_check",
]
[[package]]
name = "fixedbitset"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "279fb028e20b3c4c320317955b77c5e0c9701f05a1d309905d6fc702cdc5053e"
[[package]]
name = "flate2"
version = "1.0.22"
@@ -2266,9 +2304,9 @@ dependencies = [
[[package]]
name = "http"
version = "0.2.6"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03"
checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399"
dependencies = [
"bytes",
"fnv",
@@ -2277,15 +2315,21 @@ dependencies = [
[[package]]
name = "http-body"
version = "0.4.4"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6"
checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1"
dependencies = [
"bytes",
"http",
"pin-project-lite",
]
[[package]]
name = "http-range-header"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29"
[[package]]
name = "httparse"
version = "1.6.0"
@@ -2761,6 +2805,12 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
[[package]]
name = "matchit"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb"
[[package]]
name = "maxminddb"
version = "0.23.0"
@@ -2903,12 +2953,6 @@ dependencies = [
"version_check",
]
[[package]]
name = "multimap"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "multisig-contract-common"
version = "0.1.0"
@@ -3114,6 +3158,7 @@ dependencies = [
"coconut-interface",
"completions",
"config",
"console-subscriber",
"credential-storage",
"credentials",
"crypto",
@@ -3154,6 +3199,7 @@ dependencies = [
"colored",
"completions",
"config",
"console-subscriber",
"credentials",
"crypto",
"dashmap",
@@ -3282,6 +3328,7 @@ dependencies = [
"coconut-interface",
"completions",
"config",
"console-subscriber",
"credential-storage",
"credentials",
"crypto",
@@ -3833,16 +3880,6 @@ dependencies = [
"sha-1 0.8.2",
]
[[package]]
name = "petgraph"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a13a2fa9d0b63e5f22328828741e523766fff0ee9e779316902290dff3f824f"
dependencies = [
"fixedbitset",
"indexmap",
]
[[package]]
name = "pickledb"
version = "0.4.1"
@@ -4007,16 +4044,6 @@ dependencies = [
"yansi",
]
[[package]]
name = "prost"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "444879275cb4fd84958b1a1d5420d15e6fcf7c235fe47f053c9c2a80aceb6001"
dependencies = [
"bytes",
"prost-derive 0.9.0",
]
[[package]]
name = "prost"
version = "0.10.3"
@@ -4028,36 +4055,13 @@ dependencies = [
]
[[package]]
name = "prost-build"
version = "0.9.0"
name = "prost"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5"
checksum = "399c3c31cdec40583bb68f0b18403400d01ec4289c383aa047560439952c4dd7"
dependencies = [
"bytes",
"heck 0.3.3",
"itertools",
"lazy_static",
"log",
"multimap",
"petgraph",
"prost 0.9.0",
"prost-types 0.9.0",
"regex",
"tempfile",
"which",
]
[[package]]
name = "prost-derive"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9cc1a3263e07e0bf68e96268f37665207b49560d98739662cdfaae215c720fe"
dependencies = [
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn",
"prost-derive 0.11.0",
]
[[package]]
@@ -4074,13 +4078,16 @@ dependencies = [
]
[[package]]
name = "prost-types"
version = "0.9.0"
name = "prost-derive"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "534b7a0e836e3c482d2693070f982e39e7611da9695d4d1f5a4b186b51faef0a"
checksum = "7345d5f0e08c0536d7ac7229952590239e77abf0a0100a1b1d890add6ea96364"
dependencies = [
"bytes",
"prost 0.9.0",
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn",
]
[[package]]
@@ -4093,6 +4100,16 @@ dependencies = [
"prost 0.10.3",
]
[[package]]
name = "prost-types"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dfaa718ad76a44b3415e6c4d53b17c8f99160dcb3a99b10470fce8ad43f6e3e"
dependencies = [
"bytes",
"prost 0.11.0",
]
[[package]]
name = "proxy-helpers"
version = "0.1.0"
@@ -5566,6 +5583,12 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "sync_wrapper"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8"
[[package]]
name = "synstructure"
version = "0.12.6"
@@ -5974,12 +5997,13 @@ dependencies = [
[[package]]
name = "tonic"
version = "0.6.2"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff08f4649d10a70ffa3522ca559031285d8e421d727ac85c60825761818f5d0a"
checksum = "55b9af819e54b8f33d453655bef9b9acc171568fb49523078d0cc4e7484200ec"
dependencies = [
"async-stream",
"async-trait",
"axum",
"base64",
"bytes",
"futures-core",
@@ -5991,11 +6015,11 @@ dependencies = [
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost 0.9.0",
"prost-derive 0.9.0",
"prost 0.11.0",
"prost-derive 0.11.0",
"tokio",
"tokio-stream",
"tokio-util 0.6.9",
"tokio-util 0.7.3",
"tower",
"tower-layer",
"tower-service",
@@ -6003,18 +6027,6 @@ dependencies = [
"tracing-futures",
]
[[package]]
name = "tonic-build"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9403f1bafde247186684b230dc6f38b5cd514584e8bec1dd32514be4745fa757"
dependencies = [
"proc-macro2",
"prost-build",
"quote",
"syn",
]
[[package]]
name = "topology"
version = "0.1.0"
@@ -6049,6 +6061,25 @@ dependencies = [
"tracing",
]
[[package]]
name = "tower-http"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c530c8675c1dbf98facee631536fa116b5fb6382d7dd6dc1b118d970eafe3ba"
dependencies = [
"bitflags",
"bytes",
"futures-core",
"futures-util",
"http",
"http-body",
"http-range-header",
"pin-project-lite",
"tower",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower-layer"
version = "0.3.1"
@@ -6681,17 +6712,6 @@ dependencies = [
"serde_json",
]
[[package]]
name = "which"
version = "4.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c4fb54e6113b6a8772ee41c3404fb0301ac79604489467e0a9ce1f3e97c24ae"
dependencies = [
"either",
"lazy_static",
"libc",
]
[[package]]
name = "winapi"
version = "0.3.9"
@@ -175,7 +175,10 @@ impl LoopCoverTrafficStream<OsRng> {
// - we run out of memory
// - the receiver channel is closed
// in either case there's no recovery and we can only panic
self.mix_tx.unbounded_send(vec![cover_message]).unwrap();
//self.mix_tx.unbounded_send(vec![cover_message]).unwrap();
if let Err(err) = self.mix_tx.try_send(vec![cover_message]) {
log::error!("Failed to send cover traffic: {}", err);
}
// TODO: I'm not entirely sure whether this is really required, because I'm not 100%
// sure how `yield_now()` works - whether it just notifies the scheduler or whether it
@@ -197,7 +200,7 @@ impl LoopCoverTrafficStream<OsRng> {
sample_poisson_duration(&mut self.rng, self.average_cover_message_sending_delay);
self.next_delay = Box::pin(time::sleep(sampled));
spawn_future(async move {
spawn_future("loop cover traffic stream", async move {
debug!("Started LoopCoverTrafficStream with graceful shutdown support");
while !shutdown.is_shutdown() {
@@ -208,6 +211,7 @@ impl LoopCoverTrafficStream<OsRng> {
}
next = self.next() => {
if next.is_some() {
//log::debug!("loop cover traffic: got next msg to send to mix traffic");
self.on_new_message().await;
} else {
log::trace!("LoopCoverTrafficStream: Stopping since channel closed");
+10 -4
View File
@@ -8,8 +8,12 @@ use gateway_client::GatewayClient;
use log::*;
use nymsphinx::forwarding::packet::MixPacket;
pub type BatchMixMessageSender = mpsc::UnboundedSender<Vec<MixPacket>>;
pub type BatchMixMessageReceiver = mpsc::UnboundedReceiver<Vec<MixPacket>>;
//pub type BatchMixMessageSender = mpsc::UnboundedSender<Vec<MixPacket>>;
//pub type BatchMixMessageReceiver = mpsc::UnboundedReceiver<Vec<MixPacket>>;
//pub type BatchMixMessageSender = mpsc::Sender<Vec<MixPacket>>;
//pub type BatchMixMessageReceiver = mpsc::Receiver<Vec<MixPacket>>;
pub type BatchMixMessageSender = tokio::sync::mpsc::Sender<Vec<MixPacket>>;
pub type BatchMixMessageReceiver = tokio::sync::mpsc::Receiver<Vec<MixPacket>>;
const MAX_FAILURE_COUNT: usize = 100;
@@ -38,6 +42,7 @@ impl MixTrafficController {
async fn on_messages(&mut self, mut mix_packets: Vec<MixPacket>) {
debug_assert!(!mix_packets.is_empty());
//log::debug!("on_messages: {}", mix_packets.len());
let result = if mix_packets.len() == 1 {
let mix_packet = mix_packets.pop().unwrap();
@@ -67,13 +72,14 @@ impl MixTrafficController {
#[cfg(not(target_arch = "wasm32"))]
pub fn start_with_shutdown(mut self, mut shutdown: task::ShutdownListener) {
spawn_future(async move {
spawn_future("mix traffic controller", async move {
debug!("Started MixTrafficController with graceful shutdown support");
while !shutdown.is_shutdown() {
tokio::select! {
mix_packets = self.mix_rx.next() => match mix_packets {
mix_packets = self.mix_rx.recv() => match mix_packets {
Some(mix_packets) => {
//log::debug!("received mix packet to send to gateway");
self.on_messages(mix_packets).await;
},
None => {
@@ -33,7 +33,7 @@ impl AcknowledgementListener {
}
async fn on_ack(&mut self, ack_content: Vec<u8>) {
debug!("Received an ack");
trace!("Received an ack");
let frag_id = match recover_identifier(&self.ack_key, &ack_content)
.map(FragmentIdentifier::try_from_bytes)
{
@@ -188,6 +188,7 @@ where
config.average_ack_delay,
)
.with_custom_real_message_packet_size(config.packet_size);
//.with_custom_real_message_packet_size(nymsphinx::params::PacketSize::ExtendedPacket);
// will listen for any acks coming from the network
let acknowledgement_listener = AcknowledgementListener::new(
@@ -243,7 +244,7 @@ where
let mut action_controller = self.action_controller;
let shutdown_handle = shutdown.clone();
spawn_future(async move {
spawn_future("acknowledgement listener", async move {
acknowledgement_listener
.run_with_shutdown(shutdown_handle)
.await;
@@ -251,7 +252,7 @@ where
});
let shutdown_handle = shutdown.clone();
spawn_future(async move {
spawn_future("input message listener", async move {
input_message_listener
.run_with_shutdown(shutdown_handle)
.await;
@@ -259,7 +260,7 @@ where
});
let shutdown_handle = shutdown.clone();
spawn_future(async move {
spawn_future("retransmission request listener", async move {
retransmission_request_listener
.run_with_shutdown(shutdown_handle)
.await;
@@ -267,14 +268,14 @@ where
});
let shutdown_handle = shutdown.clone();
spawn_future(async move {
spawn_future("sent notification listener", async move {
sent_notification_listener
.run_with_shutdown(shutdown_handle)
.await;
debug!("The sent notification listener has finished execution!");
});
spawn_future(async move {
spawn_future("action controller", async move {
action_controller.run_with_shutdown(shutdown).await;
debug!("The controller has finished execution!");
});
@@ -173,7 +173,7 @@ impl RealMessagesController<OsRng> {
let ack_control = self.ack_control;
let shutdown_handle = shutdown.clone();
spawn_future(async move {
spawn_future("out queue control", async move {
out_queue_control.run_with_shutdown(shutdown_handle).await;
debug!("The out queue controller has finished execution!");
});
@@ -4,7 +4,7 @@
use crate::client::mix_traffic::BatchMixMessageSender;
use crate::client::real_messages_control::acknowledgement_control::SentPacketNotificationSender;
use crate::client::topology_control::TopologyAccessor;
use futures::channel::mpsc;
use futures::channel::mpsc::{self};
use futures::task::{Context, Poll};
use futures::{Future, Stream, StreamExt};
use log::*;
@@ -20,6 +20,8 @@ use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::error::TrySendError;
use tokio::time::sleep;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time;
@@ -27,6 +29,8 @@ use tokio::time;
#[cfg(target_arch = "wasm32")]
use wasm_timer;
const REDUCE_DELAY_THRESHOLD: usize = 100;
/// Configurable parameters of the `OutQueueControl`
pub(crate) struct Config {
/// Average delay an acknowledgement packet is going to get delay at a single mixnode.
@@ -89,6 +93,13 @@ where
#[cfg(target_arch = "wasm32")]
next_delay: Option<Pin<Box<wasm_timer::Delay>>>,
/// The current message delay, only different from `average_message_sending_delay` if there has been
/// backpressure from the gateway client to slow down.
current_average_message_sending_delay: Duration,
/// Count the number of successful sends to determine if we should reduce sending delay
number_of_consectivive_successful_sends: usize,
/// Channel used for sending prepared sphinx packets to `MixTrafficController` that sends them
/// out to the network without any further delays.
mix_tx: BatchMixMessageSender,
@@ -151,11 +162,14 @@ where
our_full_destination: Recipient,
topology_access: TopologyAccessor,
) -> Self {
let current_average_message_sending_delay = config.average_message_sending_delay;
OutQueueControl {
config,
ack_key,
sent_notifier,
next_delay: None,
current_average_message_sending_delay,
number_of_consectivive_successful_sends: 0,
mix_tx,
real_receiver,
our_full_destination,
@@ -173,10 +187,12 @@ where
self.sent_notifier.unbounded_send(frag_id).unwrap();
}
#[allow(clippy::too_many_lines)]
async fn on_message(&mut self, next_message: StreamMessage) {
trace!("created new message");
let next_message = match next_message {
//let next_message = match next_message {
match next_message {
StreamMessage::Cover => {
// TODO for way down the line: in very rare cases (during topology update) we might have
// to wait a really tiny bit before actually obtaining the permit hence messing with our
@@ -195,7 +211,7 @@ where
}
let topology_ref = topology_ref_option.unwrap();
generate_loop_cover_packet(
let next_message = generate_loop_cover_packet(
&mut self.rng,
topology_ref,
&self.ack_key,
@@ -204,11 +220,120 @@ where
self.config.average_packet_delay,
self.config.cover_packet_size,
)
.expect("Somehow failed to generate a loop cover message with a valid topology")
.expect("Somehow failed to generate a loop cover message with a valid topology");
log::info!("capacity: {}", self.mix_tx.capacity());
if self.mix_tx.max_capacity() - self.mix_tx.capacity() > 4 && self.number_of_consectivive_successful_sends > 10 {
self.current_average_message_sending_delay =
self.current_average_message_sending_delay.mul_f64(1.2);
log::error!(
"new average_message_sending_delay: {:?}",
self.current_average_message_sending_delay
);
self.number_of_consectivive_successful_sends = 0;
}
match self.mix_tx.try_send(vec![next_message]) {
Ok(_) => {
self.number_of_consectivive_successful_sends += 1;
//if self.number_of_consectivive_successful_sends > REDUCE_DELAY_THRESHOLD {
// self.number_of_consectivive_successful_sends = 0;
// log::error!(
// "old average_message_sending_delay: {:?}",
// self.current_average_message_sending_delay
// );
// self.current_average_message_sending_delay =
// self.current_average_message_sending_delay.mul_f64(0.9);
// log::error!(
// "new average_message_sending_delay: {:?}",
// self.current_average_message_sending_delay
// );
//}
}
Err(TrySendError::Full(err)) => {
log::error!("Failed to send");
//self.number_of_consectivive_successful_sends = 0;
//// Increase average send delay
//log::error!(
// "old average_message_sending_delay: {:?}",
// self.current_average_message_sending_delay
//);
////self.current_average_message_sending_delay *= 2;
//self.current_average_message_sending_delay =
// self.current_average_message_sending_delay.mul_f64(1.2);
//log::error!(
// "new average_message_sending_delay: {:?}",
// self.current_average_message_sending_delay
//);
sleep(Duration::from_millis(100)).await;
}
Err(TrySendError::Closed(err)) => {
log::warn!("should not happen during normal operation!");
}
};
}
StreamMessage::Real(real_message) => {
self.sent_notify(real_message.fragment_id);
real_message.mix_packet
let RealMessage {
mix_packet: next_message,
fragment_id,
} = *real_message;
log::info!("capacity: {}", self.mix_tx.capacity());
if self.mix_tx.max_capacity() - self.mix_tx.capacity() > 4 && self.number_of_consectivive_successful_sends > 10 {
self.current_average_message_sending_delay =
self.current_average_message_sending_delay.mul_f64(1.2);
log::error!(
"new average_message_sending_delay: {:?}",
self.current_average_message_sending_delay
);
self.number_of_consectivive_successful_sends = 0;
}
match self.mix_tx.try_send(vec![next_message]) {
Ok(_) => {
self.sent_notify(fragment_id);
self.number_of_consectivive_successful_sends += 1;
//if self.number_of_consectivive_successful_sends > REDUCE_DELAY_THRESHOLD {
// self.number_of_consectivive_successful_sends = 0;
// log::error!(
// "old average_message_sending_delay: {:?}",
// self.current_average_message_sending_delay
// );
// self.current_average_message_sending_delay =
// self.current_average_message_sending_delay.mul_f64(0.9);
// log::error!(
// "new average_message_sending_delay: {:?}",
// self.current_average_message_sending_delay
// );
//}
}
Err(TrySendError::Full(err)) => {
log::error!("Failed to send, channel full, will retry: {}", fragment_id);
//self.number_of_consectivive_successful_sends = 0;
// Re-queue at the front
let mut msg = err;
assert!(msg.len() == 1);
let msg = msg.pop().unwrap();
let new_real_message = RealMessage::new(msg, real_message.fragment_id);
self.received_buffer.push_front(new_real_message);
// Increase average send delay
//log::error!(
// "old average_message_sending_delay: {:?}",
// self.current_average_message_sending_delay
//);
////self.current_average_message_sending_delay *= 2;
//self.current_average_message_sending_delay =
// self.current_average_message_sending_delay.mul_f64(1.2);
//log::error!(
// "new average_message_sending_delay: {:?}",
// self.current_average_message_sending_delay
//);
sleep(Duration::from_millis(100)).await;
}
Err(TrySendError::Closed(err)) => {
log::warn!("should not happen during normal operation!");
}
};
}
};
@@ -216,12 +341,13 @@ where
// - we run out of memory
// - the receiver channel is closed
// in either case there's no recovery and we can only panic
if let Err(err) = self.mix_tx.unbounded_send(vec![next_message]) {
log::warn!(
"Failed to send {} packets (possible process shutdown?)",
err.into_inner().len()
);
}
//if let Err(err) = self.mix_tx.unbounded_send(vec![next_message]) {
// log::warn!(
// "Failed to send {} packets (possible process shutdown?)",
// err.into_inner().len()
// );
//}
//self.mix_tx.try_send(vec![next_message]).unwrap();
// JS: Not entirely sure why or how it fixes stuff, but without the yield call,
// the UnboundedReceiver [of mix_rx] will not get a chance to read anything
@@ -238,12 +364,15 @@ where
if let Some(ref mut next_delay) = &mut self.next_delay {
// it is not yet time to return a message
if next_delay.as_mut().poll(cx).is_pending() {
//log::debug!("poisson: pending");
return Poll::Pending;
//} else {
//log::debug!("poisson: not pending");
};
// we know it's time to send a message, so let's prepare delay for the next one
// Get the `now` by looking at the current `delay` deadline
let avg_delay = self.config.average_message_sending_delay;
let avg_delay = self.current_average_message_sending_delay;
let next_poisson_delay = sample_poisson_duration(&mut self.rng, avg_delay);
// The next interval value is `next_poisson_delay` after the one that just
@@ -262,6 +391,11 @@ where
// check if we have anything immediately available
if let Some(real_available) = self.received_buffer.pop_front() {
//log::debug!("real available");
log::debug!(
"sending from received_buffer: {}",
self.received_buffer.len()
);
return Poll::Ready(Some(StreamMessage::Real(Box::new(real_available))));
}
@@ -273,6 +407,7 @@ where
// if there are more messages available, return first one and store the rest
Poll::Ready(Some(real_messages)) => {
log::debug!("sending from real_receiver: {}", real_messages.len());
self.received_buffer = real_messages.into();
// we MUST HAVE received at least ONE message
Poll::Ready(Some(StreamMessage::Real(Box::new(
@@ -281,9 +416,13 @@ where
}
// otherwise construct a dummy one
Poll::Pending => Poll::Ready(Some(StreamMessage::Cover)),
Poll::Pending => {
//log::debug!("cover message");
Poll::Ready(Some(StreamMessage::Cover))
}
}
} else {
log::debug!("poisson: not send");
// we never set an initial delay - let's do it now
cx.waker().wake_by_ref();
@@ -347,14 +486,29 @@ where
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: task::ShutdownListener) {
debug!("Started OutQueueControl with graceful shutdown support");
//let stream = self.delay_stream();
//let stream = delay_stream(self.config.average_message_sending_delay);
//tokio::pin!(stream);
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown.recv() => {
log::trace!("OutQueueControl: Received shutdown");
}
//next_message = stream.next() => match next_message {
// Some(next_message) => {
// log::debug!("out queue control: got next_message to send to mix traffic");
// self.on_message(next_message).await;
// },
// None => {
// log::trace!("OutQueueControl: Stopping since channel closed");
// break;
// }
//}
next_message = self.next() => match next_message {
Some(next_message) => {
//log::debug!("out queue control: got next_message to send to mix traffic");
self.on_message(next_message).await;
},
None => {
@@ -378,6 +532,19 @@ where
}
}
//fn delay_stream(avg_delay: Duration) -> impl futures::Stream<Item = StreamMessage> + 'static {
// let mut rng = rand::rngs::OsRng;
//
// futures::stream::unfold((rng, avg_delay), |(mut rng, avg_delay)| async {
// //let avg_delay = out_queue_control.config.average_message_sending_delay;
// let next_poisson_delay = sample_poisson_duration(&mut rng, avg_delay);
// time::sleep(next_poisson_delay).await;
// //let a = 1;
// let msg = StreamMessage::Cover;
// Some((msg, (rng, avg_delay)))
// })
//}
impl<R> Stream for OutQueueControl<R>
where
R: CryptoRng + Rng + Unpin,
@@ -208,7 +208,7 @@ impl ReceivedMessagesBuffer {
}
async fn handle_new_received(&mut self, msgs: Vec<Vec<u8>>) {
debug!(
trace!(
"Processing {:?} new message that might get added to the buffer!",
msgs.len()
);
@@ -436,12 +436,12 @@ impl ReceivedMessagesBufferController {
let mut request_receiver = self.request_receiver;
let shutdown_handle = shutdown.clone();
spawn_future(async move {
spawn_future("fragmented message receiver", async move {
fragmented_message_receiver
.run_with_shutdown(shutdown_handle)
.await;
});
spawn_future(async move {
spawn_future("request receiver", async move {
request_receiver.run_with_shutdown(shutdown).await;
});
}
@@ -305,7 +305,7 @@ impl TopologyRefresher {
#[cfg(not(target_arch = "wasm32"))]
pub fn start_with_shutdown(mut self, mut shutdown: task::ShutdownListener) {
spawn_future(async move {
spawn_future("topology refresher", async move {
debug!("Started TopologyRefresher with graceful shutdown support");
while !shutdown.is_shutdown() {
+24 -2
View File
@@ -13,11 +13,33 @@ where
wasm_bindgen_futures::spawn_local(future);
}
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn spawn_future<F>(future: F)
//#[cfg(not(target_arch = "wasm32"))]
//pub(crate) fn spawn_future<F>(future: F)
//where
// F: Future + Send + 'static,
// F::Output: Send + 'static,
//{
// tokio::spawn(future);
//}
pub(crate) fn spawn_future<F>(task_name: &str, future: F)
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
//tokio::task::Builder::default()
// .name(task_name)
// .spawn(future);
tokio::spawn(future);
}
//pub(crate) fn spawn_future_named<F>(future: F, task_name: &str)
//where
// F: Future + Send + 'static,
// F::Output: Send + 'static,
//{
// tokio::task::Builder::default()
// .name(task_name)
// .spawn(future);
// //tokio::spawn(future);
//}
+1
View File
@@ -28,6 +28,7 @@ rand = { version = "0.7.3", features = ["wasm-bindgen"] } # rng-related traits +
serde = { version = "1.0.104", features = ["derive"] } # for config serialization/deserialization
sled = "0.34" # for storage of replySURB decryption keys
tokio = { version = "1.21.2", features = ["rt-multi-thread", "net", "signal"] } # async runtime
console-subscriber = { version = "0.1.8"} # validator-api needs to be built with RUSTFLAGS="--cfg tokio_unstable"
tokio-tungstenite = "0.14" # websocket
## internal
+3 -1
View File
@@ -359,7 +359,9 @@ impl NymClient {
// sphinx_message_sender is the transmitter for any component generating sphinx packets that are to be sent to the mixnet
// they are used by cover traffic stream and real traffic stream
// sphinx_message_receiver is the receiver used by MixTrafficController that sends the actual traffic
let (sphinx_message_sender, sphinx_message_receiver) = mpsc::unbounded();
//let (sphinx_message_sender, sphinx_message_receiver) = mpsc::unbounded();
//let (sphinx_message_sender, sphinx_message_receiver) = mpsc::channel(4);
let (sphinx_message_sender, sphinx_message_receiver) = tokio::sync::mpsc::channel(16);
// unwrapped_sphinx_sender is the transmitter of mixnet messages received from the gateway
// unwrapped_sphinx_receiver is the receiver for said messages - used by ReceivedMessagesBuffer
+1
View File
@@ -21,6 +21,7 @@ rand = { version = "0.7.3", features = ["wasm-bindgen"] }
serde = { version = "1.0", features = ["derive"] } # for config serialization/deserialization
snafu = "0.6"
tokio = { version = "1.21.2", features = ["rt-multi-thread", "net", "signal"] }
console-subscriber = { version = "0.1.8"} # validator-api needs to be built with RUSTFLAGS="--cfg tokio_unstable"
url = "2.2"
# internal
+3 -1
View File
@@ -348,7 +348,9 @@ impl NymClient {
// sphinx_message_sender is the transmitter for any component generating sphinx packets that are to be sent to the mixnet
// they are used by cover traffic stream and real traffic stream
// sphinx_message_receiver is the receiver used by MixTrafficController that sends the actual traffic
let (sphinx_message_sender, sphinx_message_receiver) = mpsc::unbounded();
//let (sphinx_message_sender, sphinx_message_receiver) = mpsc::unbounded();
//let (sphinx_message_sender, sphinx_message_receiver) = mpsc::channel(4);
let (sphinx_message_sender, sphinx_message_receiver) = tokio::sync::mpsc::channel(16);
// unwrapped_sphinx_sender is the transmitter of mixnet messages received from the gateway
// unwrapped_sphinx_receiver is the receiver for said messages - used by ReceivedMessagesBuffer
+3
View File
@@ -10,6 +10,9 @@ pub mod socks;
#[tokio::main]
async fn main() {
// instrument tokio console subscriber needs RUSTFLAGS="--cfg tokio_unstable" at build time
//console_subscriber::init();
setup_logging();
println!("{}", banner());
+2 -1
View File
@@ -35,7 +35,8 @@ default-features = false
# non-wasm-only dependencies
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio]
version = "1.21.2"
features = ["macros", "rt", "net", "sync", "time"]
#features = ["macros", "rt", "net", "sync", "time"]
features = ["full", "tracing"]
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-stream]
version = "0.1.9"
@@ -383,11 +383,13 @@ impl GatewayClient {
) -> Result<(), GatewayClientError> {
match self.connection {
SocketState::Available(ref mut conn) => {
log::debug!("SocketState::Available: sending size: {}", messages.len());
let stream_messages: Vec<_> = messages.into_iter().map(Ok).collect();
let mut send_stream = futures::stream::iter(stream_messages);
Ok(conn.send_all(&mut send_stream).await?)
}
SocketState::PartiallyDelegated(ref mut partially_delegated) => {
log::debug!("SocketState::PartiallyDelegated: sending size: {}", messages.len());
if let Err(err) = partially_delegated
.batch_send_without_response(messages)
.await
@@ -415,8 +417,12 @@ impl GatewayClient {
msg: Message,
) -> Result<(), GatewayClientError> {
match self.connection {
SocketState::Available(ref mut conn) => Ok(conn.send(msg).await?),
SocketState::Available(ref mut conn) => {
//log::debug!("SocketState::Available: conn.send()");
Ok(conn.send(msg).await?)
},
SocketState::PartiallyDelegated(ref mut partially_delegated) => {
//log::debug!("SocketState::PartiallyDelegated: send_without_response()");
if let Err(err) = partially_delegated.send_without_response(msg).await {
error!("failed to send message without response - {}...", err);
// we must ensure we do not leave the task still active
@@ -607,6 +613,7 @@ impl GatewayClient {
&mut self,
packets: Vec<MixPacket>,
) -> Result<(), GatewayClientError> {
//log::debug!("batch_send_mix_packets: {}", packets.len());
if !self.authenticated {
return Err(GatewayClientError::NotAuthenticated);
}
@@ -677,6 +684,7 @@ impl GatewayClient {
&mut self,
mix_packet: MixPacket,
) -> Result<(), GatewayClientError> {
//log::debug!("send_mix_packet");
if !self.authenticated {
return Err(GatewayClientError::NotAuthenticated);
}
@@ -23,7 +23,7 @@ pub struct Args {
#[clap(
value_parser,
requires = "fundsDenom",
requires = "funds-denom",
help = "Amount to supply as funds in micro denomination (e.g. unym or unyx)"
)]
pub funds: Option<u128>,
@@ -28,7 +28,7 @@ pub struct Args {
#[clap(
long,
requires = "fundsDenom",
requires = "funds-denom",
help = "Amount to supply as funds in micro denomination (e.g. unym or unyx)"
)]
pub funds: Option<u128>,
+1 -1
View File
@@ -9,7 +9,7 @@ MIX_DENOM_DISPLAY=nym
STAKE_DENOM=unyx
STAKE_DENOM_DISPLAY=nyx
DENOMS_EXPONENT=6
MIXNET_CONTRACT_ADDRESS=n1suhgf5svhu4usrurvxzlgn54ksxmn8gljarjtxqnapv8kjnp4nrsd3qaep
MIXNET_CONTRACT_ADDRESS=n1rjzps6qrmdqmf0xz4cn4x4rcmqeqzq6hnzqg4wcvd0r2lyasdq5sepn5s8
VESTING_CONTRACT_ADDRESS=n1xr3rq8yvd7qplsw5yx90ftsr2zdhg4e9z60h5duusgxpv72hud3sjkxkav
BANDWIDTH_CLAIM_CONTRACT_ADDRESS=n19lc9u84cz0yz3fww5283nucc9yvr8gsjmgeul0
COCONUT_BANDWIDTH_CONTRACT_ADDRESS=n1ghd753shjuwexxywmgs4xz7x2q732vcn7ty4yw
+1
View File
@@ -49,6 +49,7 @@ tokio-stream = { version = "0.1.9", features = ["fs"] }
tokio-tungstenite = "0.14"
tokio-util = { version = "0.7.3", features = ["codec"] }
url = { version = "2.2", features = ["serde"] }
console-subscriber = { version = "0.1.8"} # validator-api needs to be built with RUSTFLAGS="--cfg tokio_unstable"
# internal
coconut-interface = { path = "../common/coconut-interface", optional = true }
+3
View File
@@ -29,6 +29,9 @@ struct Cli {
#[tokio::main]
async fn main() {
// instrument tokio console subscriber needs RUSTFLAGS="--cfg tokio_unstable" at build time
//console_subscriber::init();
setup_logging();
println!("{}", banner());
LONG_VERSION
@@ -120,7 +120,7 @@ impl<St: Storage> ConnectionHandler<St> {
fn forward_ack(&self, forward_ack: Option<MixPacket>, client_address: DestinationAddressBytes) {
if let Some(forward_ack) = forward_ack {
trace!(
debug!(
"Sending ack from packet for {} to {}",
client_address,
forward_ack.next_hop()
@@ -131,23 +131,32 @@ impl<St: Storage> ConnectionHandler<St> {
}
async fn handle_processed_packet(&mut self, processed_final_hop: ProcessedFinalHop) {
log::debug!("Handle processed packet");
let client_address = processed_final_hop.destination;
let message = processed_final_hop.message;
let forward_ack = processed_final_hop.forward_ack;
// we failed to push message directly to the client - it's probably offline.
// we should store it on the disk instead.
let t = tokio::time::Instant::now();
match self.try_push_message_to_client(client_address, message) {
Err(unsent_plaintext) => match self
.store_processed_packet_payload(client_address, unsent_plaintext)
.await
{
Err(err) => error!("Failed to store client data - {}", err),
Ok(_) => trace!("Stored packet for {}", client_address),
Ok(_) => debug!("Stored packet for {}", client_address),
},
Ok(_) => trace!("Pushed received packet to {}", client_address),
Ok(_) => debug!("Pushed received packet to {}", client_address),
}
let elapsed = t.elapsed();
if elapsed.as_millis() > 100 {
log::error!("push message top client, time elapsed: {:?}", elapsed);
}
// if we managed to either push message directly to the [online] client or store it at
// its inbox, it means that it must exist at this gateway, hence we can send the
// received ack back into the network
@@ -161,6 +170,8 @@ impl<St: Storage> ConnectionHandler<St> {
// question: can it also be per connection vs global?
//
log::debug!("Handle received packet");
let processed_final_hop = match self.packet_processor.process_received(framed_sphinx_packet)
{
Err(e) => {