Compare commits

...

5 Commits

Author SHA1 Message Date
Jon Häggblad 73955ecfa3 WIP 2022-09-07 17:01:52 +02:00
Jon Häggblad 06ac21e21b more clippy 2022-09-07 15:39:40 +02:00
Jon Häggblad bb145a1cc3 clippy 2022-09-07 15:39:40 +02:00
Jon Häggblad 5b0d5ac2ba changelog: add note 2022-09-07 15:39:40 +02:00
Jon Häggblad 9a0086b5e8 socks5-client: graceful shutdown 2022-09-07 15:39:38 +02:00
40 changed files with 592 additions and 184 deletions
+2
View File
@@ -12,10 +12,12 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
### Changed
- validator-client: made `fee` argument optional for `execute` and `execute_multiple` ([#1541])
- socks5 client: graceful shutdown should fix error on disconnect in nym-connect ([#1591])
[#1541]: https://github.com/nymtech/nym/pull/1541
[#1558]: https://github.com/nymtech/nym/pull/1558
[#1577]: https://github.com/nymtech/nym/pull/1577
[#1591]: https://github.com/nymtech/nym/pull/1591
## [nym-binaries-1.0.2](https://github.com/nymtech/nym/tree/nym-binaries-1.0.2)
Generated
+8
View File
@@ -668,6 +668,7 @@ dependencies = [
"serde",
"sled",
"tap",
"task",
"tempfile",
"thiserror",
"tokio",
@@ -2054,8 +2055,10 @@ dependencies = [
"pemstore",
"rand 0.7.3",
"secp256k1",
"task",
"thiserror",
"tokio",
"tokio-stream",
"tokio-tungstenite 0.14.0",
"tungstenite 0.13.0",
"url",
@@ -3255,6 +3258,7 @@ dependencies = [
"serde",
"serde_json",
"sled",
"task",
"tokio",
"tokio-tungstenite 0.14.0",
"topology",
@@ -3375,6 +3379,7 @@ dependencies = [
"socks5-requests",
"sqlx 0.6.1",
"statistics-common",
"task",
"thiserror",
"tokio",
"tokio-tungstenite 0.17.2",
@@ -3424,6 +3429,7 @@ dependencies = [
"serde",
"snafu",
"socks5-requests",
"task",
"tokio",
"topology",
"url",
@@ -4258,6 +4264,7 @@ dependencies = [
"log",
"ordered-buffer",
"socks5-requests",
"task",
"tokio",
"tokio-test",
"tokio-util 0.7.3",
@@ -6138,6 +6145,7 @@ dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util 0.7.3",
]
[[package]]
+1
View File
@@ -26,6 +26,7 @@ gateway-requests = { path = "../../gateway/gateway-requests" }
nonexhaustive-delayqueue = { path = "../../common/nonexhaustive-delayqueue" }
nymsphinx = { path = "../../common/nymsphinx" }
pemstore = { path = "../../common/pemstore" }
task = { path = "../../common/task" }
topology = { path = "../../common/topology" }
validator-client = { path = "../../common/client-libs/validator-client" }
tap = "1.0.1"
@@ -13,6 +13,7 @@ use nymsphinx::utils::sample_poisson_duration;
use rand::{rngs::OsRng, CryptoRng, Rng};
use std::pin::Pin;
use std::sync::Arc;
use task::ShutdownListener;
use tokio::task::JoinHandle;
use tokio::time;
@@ -48,6 +49,9 @@ where
/// Accessor to the common instance of network topology.
topology_access: TopologyAccessor,
/// Listen to shutdown signals.
shutdown: ShutdownListener,
}
impl<R> Stream for LoopCoverTrafficStream<R>
@@ -84,6 +88,7 @@ where
// obviously when we finally make shared rng that is on 'higher' level, this should become
// generic `R`
impl LoopCoverTrafficStream<OsRng> {
#[allow(clippy::too_many_arguments)]
pub fn new(
ack_key: Arc<AckKey>,
average_ack_delay: time::Duration,
@@ -92,6 +97,7 @@ impl LoopCoverTrafficStream<OsRng> {
mix_tx: BatchMixMessageSender,
our_full_destination: Recipient,
topology_access: TopologyAccessor,
shutdown: ShutdownListener,
) -> Self {
let rng = OsRng;
@@ -105,6 +111,7 @@ impl LoopCoverTrafficStream<OsRng> {
our_full_destination,
rng,
topology_access,
shutdown,
}
}
@@ -159,9 +166,21 @@ impl LoopCoverTrafficStream<OsRng> {
self.average_cover_message_sending_delay,
)));
while self.next().await.is_some() {
self.on_new_message().await;
let mut shutdown = self.shutdown.clone();
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown.recv() => {
log::trace!("LoopCoverTrafficStream: Received shutdown");
}
next = self.next() => {
if next.is_some() {
self.on_new_message().await;
}
}
}
}
log::debug!("LoopCoverTrafficStream: Exiting");
}
pub fn start(mut self) -> JoinHandle<()> {
+20 -2
View File
@@ -6,6 +6,7 @@ use futures::StreamExt;
use gateway_client::GatewayClient;
use log::*;
use nymsphinx::forwarding::packet::MixPacket;
use task::ShutdownListener;
use tokio::task::JoinHandle;
pub type BatchMixMessageSender = mpsc::UnboundedSender<Vec<MixPacket>>;
@@ -18,6 +19,7 @@ pub struct MixTrafficController {
// later on gateway_client will need to be accessible by other entities
gateway_client: GatewayClient,
mix_rx: BatchMixMessageReceiver,
shutdown: ShutdownListener,
// TODO: this is temporary work-around.
// in long run `gateway_client` will be moved away from `MixTrafficController` anyway.
@@ -28,10 +30,12 @@ impl MixTrafficController {
pub fn new(
mix_rx: BatchMixMessageReceiver,
gateway_client: GatewayClient,
shutdown: ShutdownListener,
) -> MixTrafficController {
MixTrafficController {
gateway_client,
mix_rx,
shutdown,
consecutive_gateway_failure_count: 0,
}
}
@@ -66,9 +70,23 @@ impl MixTrafficController {
}
pub async fn run(&mut self) {
while let Some(mix_packets) = self.mix_rx.next().await {
self.on_messages(mix_packets).await;
while !self.shutdown.is_shutdown() {
tokio::select! {
mix_packets = self.mix_rx.next() => match mix_packets {
Some(mix_packets) => {
self.on_messages(mix_packets).await;
},
None => {
log::trace!("MixTrafficController: Stopping since channel closed");
break;
}
},
_ = self.shutdown.recv() => {
log::trace!("MixTrafficController: Received shutdown");
}
}
}
log::debug!("MixTrafficController: Exiting");
}
pub fn start(mut self) -> JoinHandle<()> {
+4
View File
@@ -1,3 +1,5 @@
use std::sync::atomic::AtomicBool;
pub mod cover_traffic_stream;
pub mod inbound_messages;
pub mod key_manager;
@@ -6,3 +8,5 @@ pub mod real_messages_control;
pub mod received_buffer;
pub mod reply_key_storage;
pub mod topology_control;
pub static SHUTDOWN_IS_SIGNALLED: AtomicBool = AtomicBool::new(false);
@@ -10,6 +10,7 @@ use nymsphinx::{
chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID},
};
use std::sync::Arc;
use task::ShutdownListener;
/// Module responsible for listening for any data resembling acknowledgements from the network
/// and firing actions to remove them from the 'Pending' state.
@@ -17,6 +18,7 @@ pub(super) struct AcknowledgementListener {
ack_key: Arc<AckKey>,
ack_receiver: AcknowledgementReceiver,
action_sender: ActionSender,
shutdown: ShutdownListener,
}
impl AcknowledgementListener {
@@ -24,11 +26,13 @@ impl AcknowledgementListener {
ack_key: Arc<AckKey>,
ack_receiver: AcknowledgementReceiver,
action_sender: ActionSender,
shutdown: ShutdownListener,
) -> Self {
AcknowledgementListener {
ack_key,
ack_receiver,
action_sender,
shutdown,
}
}
@@ -65,12 +69,19 @@ impl AcknowledgementListener {
pub(super) async fn run(&mut self) {
debug!("Started AcknowledgementListener");
while let Some(acks) = self.ack_receiver.next().await {
// realistically we would only be getting one ack at the time
for ack in acks {
self.on_ack(ack).await;
while !self.shutdown.is_shutdown() {
tokio::select! {
Some(acks) = self.ack_receiver.next() => {
// realistically we would only be getting one ack at the time
for ack in acks {
self.on_ack(ack).await;
}
},
_ = self.shutdown.recv() => {
log::trace!("AcknowledgementListener: Received shutdown");
}
}
}
error!("TODO: error msg. Or maybe panic?")
log::debug!("AcknowledgementListener: Exiting");
}
}
@@ -12,6 +12,7 @@ use nymsphinx::Delay as SphinxDelay;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use task::ShutdownListener;
pub(crate) type ActionSender = UnboundedSender<Action>;
@@ -99,12 +100,16 @@ pub(super) struct ActionController {
/// Channel for notifying `RetransmissionRequestListener` about expired acknowledgements.
retransmission_sender: RetransmissionRequestSender,
/// Listen for shutdown notifications
shutdown: ShutdownListener,
}
impl ActionController {
pub(super) fn new(
config: Config,
retransmission_sender: RetransmissionRequestSender,
shutdown: ShutdownListener,
) -> (Self, ActionSender) {
let (sender, receiver) = mpsc::unbounded();
(
@@ -114,6 +119,7 @@ impl ActionController {
pending_acks_timers: NonExhaustiveDelayQueue::new(),
incoming_actions: receiver,
retransmission_sender,
shutdown,
},
sender,
)
@@ -246,14 +252,18 @@ impl ActionController {
}
pub(super) async fn run(&mut self) {
loop {
// at some point there will be a global shutdown signal here as the third option
while !self.shutdown.is_shutdown() {
tokio::select! {
// we NEVER expect for ANY sender to get dropped so unwrap here is fine
action = self.incoming_actions.next() => self.process_action(action.unwrap()),
// pending ack queue Stream CANNOT return a `None` so unwrap here is fine
expired_ack = self.pending_acks_timers.next() => self.handle_expired_ack_timer(expired_ack.unwrap())
expired_ack = self.pending_acks_timers.next() => self.handle_expired_ack_timer(expired_ack.unwrap()),
// listen for shutdown notifications
_ = self.shutdown.recv() => {
log::trace!("ActionController: Received shutdown");
}
}
}
log::debug!("ActionController: Exiting");
}
}
@@ -16,6 +16,7 @@ use nymsphinx::preparer::MessagePreparer;
use nymsphinx::{acknowledgements::AckKey, addressing::clients::Recipient};
use rand::{CryptoRng, Rng};
use std::sync::Arc;
use task::ShutdownListener;
/// Module responsible for dealing with the received messages: splitting them, creating acknowledgements,
/// putting everything into sphinx packets, etc.
@@ -32,6 +33,7 @@ where
real_message_sender: BatchRealMessageSender,
topology_access: TopologyAccessor,
reply_key_storage: ReplyKeyStorage,
shutdown: ShutdownListener,
}
impl<R> InputMessageListener<R>
@@ -50,6 +52,7 @@ where
real_message_sender: BatchRealMessageSender,
topology_access: TopologyAccessor,
reply_key_storage: ReplyKeyStorage,
shutdown: ShutdownListener,
) -> Self {
InputMessageListener {
ack_key,
@@ -60,6 +63,7 @@ where
real_message_sender,
topology_access,
reply_key_storage,
shutdown,
}
}
@@ -182,9 +186,16 @@ where
pub(super) async fn run(&mut self) {
debug!("Started InputMessageListener");
while let Some(input_msg) = self.input_receiver.next().await {
self.on_input_message(input_msg).await;
while !self.shutdown.is_shutdown() {
tokio::select! {
Some(input_msg) = self.input_receiver.next() => {
self.on_input_message(input_msg).await;
},
_ = self.shutdown.recv() => {
log::trace!("InputMessageListener: Received shutdown");
}
}
}
error!("TODO: error msg. Or maybe panic?")
log::debug!("InputMessageListener: Exiting");
}
}
@@ -25,6 +25,7 @@ use std::{
sync::{Arc, Weak},
time::Duration,
};
use task::ShutdownListener;
use tokio::task::JoinHandle;
mod acknowledgement_listener;
@@ -152,6 +153,7 @@ impl<R> AcknowledgementController<R>
where
R: 'static + CryptoRng + Rng + Clone + Send,
{
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
config: Config,
rng: R,
@@ -160,13 +162,14 @@ where
ack_recipient: Recipient,
reply_key_storage: ReplyKeyStorage,
connectors: AcknowledgementControllerConnectors,
shutdown: ShutdownListener,
) -> Self {
let (retransmission_tx, retransmission_rx) = mpsc::unbounded();
let action_config =
action_controller::Config::new(config.ack_wait_addition, config.ack_wait_multiplier);
let (action_controller, action_sender) =
ActionController::new(action_config, retransmission_tx);
ActionController::new(action_config, retransmission_tx, shutdown.clone());
let message_preparer = MessagePreparer::new(
rng,
@@ -180,6 +183,7 @@ where
Arc::clone(&ack_key),
connectors.ack_receiver,
action_sender.clone(),
shutdown.clone(),
);
// will listen for any new messages from the client
@@ -192,6 +196,7 @@ where
connectors.real_message_sender.clone(),
topology_access.clone(),
reply_key_storage,
shutdown.clone(),
);
// will listen for any ack timeouts and trigger retransmission
@@ -203,12 +208,13 @@ where
connectors.real_message_sender,
retransmission_rx,
topology_access,
shutdown.clone(),
);
// will listen for events indicating the packet was sent through the network so that
// the retransmission timer should be started.
let sent_notification_listener =
SentNotificationListener::new(connectors.sent_notifier, action_sender);
SentNotificationListener::new(connectors.sent_notifier, action_sender, shutdown);
AcknowledgementController {
acknowledgement_listener: Some(acknowledgement_listener),
@@ -232,27 +238,27 @@ where
// graceful shutdowns.
let ack_listener_fut = tokio::spawn(async move {
acknowledgement_listener.run().await;
error!("The acknowledgement listener has finished execution!");
debug!("The acknowledgement listener has finished execution!");
acknowledgement_listener
});
let input_listener_fut = tokio::spawn(async move {
input_message_listener.run().await;
error!("The input listener has finished execution!");
debug!("The input listener has finished execution!");
input_message_listener
});
let retransmission_req_fut = tokio::spawn(async move {
retransmission_request_listener.run().await;
error!("The retransmission request listener has finished execution!");
debug!("The retransmission request listener has finished execution!");
retransmission_request_listener
});
let sent_notification_fut = tokio::spawn(async move {
sent_notification_listener.run().await;
error!("The sent notification listener has finished execution!");
debug!("The sent notification listener has finished execution!");
sent_notification_listener
});
let action_controller_fut = tokio::spawn(async move {
action_controller.run().await;
error!("The controller has finished execution!");
debug!("The controller has finished execution!");
action_controller
});
@@ -14,6 +14,7 @@ use nymsphinx::preparer::MessagePreparer;
use nymsphinx::{acknowledgements::AckKey, addressing::clients::Recipient};
use rand::{CryptoRng, Rng};
use std::sync::{Arc, Weak};
use task::ShutdownListener;
// responsible for packet retransmission upon fired timer
pub(super) struct RetransmissionRequestListener<R>
@@ -27,12 +28,14 @@ where
real_message_sender: BatchRealMessageSender,
request_receiver: RetransmissionRequestReceiver,
topology_access: TopologyAccessor,
shutdown: ShutdownListener,
}
impl<R> RetransmissionRequestListener<R>
where
R: CryptoRng + Rng,
{
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
ack_key: Arc<AckKey>,
ack_recipient: Recipient,
@@ -41,6 +44,7 @@ where
real_message_sender: BatchRealMessageSender,
request_receiver: RetransmissionRequestReceiver,
topology_access: TopologyAccessor,
shutdown: ShutdownListener,
) -> Self {
RetransmissionRequestListener {
ack_key,
@@ -50,6 +54,7 @@ where
real_message_sender,
request_receiver,
topology_access,
shutdown,
}
}
@@ -121,9 +126,17 @@ where
pub(super) async fn run(&mut self) {
debug!("Started RetransmissionRequestListener");
while let Some(timed_out_ack) = self.request_receiver.next().await {
self.on_retransmission_request(timed_out_ack).await;
while !self.shutdown.is_shutdown() {
tokio::select! {
Some(timed_out_ack) = self.request_receiver.next() => {
self.on_retransmission_request(timed_out_ack).await;
},
_ = self.shutdown.recv() => {
log::trace!("RetransmissionRequestListener: Received shutdown");
}
}
}
error!("TODO: error msg. Or maybe panic?")
log::debug!("RetransmissionRequestListener: Exiting");
}
}
@@ -6,6 +6,7 @@ use super::SentPacketNotificationReceiver;
use futures::StreamExt;
use log::*;
use nymsphinx::chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID};
use task::ShutdownListener;
/// Module responsible for starting up retransmission timers.
/// It is required because when we send our packet to the `real traffic stream` controlled
@@ -14,16 +15,19 @@ use nymsphinx::chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID};
pub(super) struct SentNotificationListener {
sent_notifier: SentPacketNotificationReceiver,
action_sender: ActionSender,
shutdown: ShutdownListener,
}
impl SentNotificationListener {
pub(super) fn new(
sent_notifier: SentPacketNotificationReceiver,
action_sender: ActionSender,
shutdown: ShutdownListener,
) -> Self {
SentNotificationListener {
sent_notifier,
action_sender,
shutdown,
}
}
@@ -44,9 +48,16 @@ impl SentNotificationListener {
pub(super) async fn run(&mut self) {
debug!("Started SentNotificationListener");
while let Some(frag_id) = self.sent_notifier.next().await {
self.on_sent_message(frag_id).await;
while !self.shutdown.is_shutdown() {
tokio::select! {
Some(frag_id) = self.sent_notifier.next() => {
self.on_sent_message(frag_id).await;
},
_ = self.shutdown.recv() => {
log::trace!("SentNotificationListener: Received shutdown");
}
}
}
error!("TODO: error msg. Or maybe panic?")
log::debug!("SentNotificationListener: Exiting");
}
}
@@ -22,6 +22,7 @@ use nymsphinx::addressing::clients::Recipient;
use rand::{rngs::OsRng, CryptoRng, Rng};
use std::sync::Arc;
use std::time::Duration;
use task::ShutdownListener;
use tokio::task::JoinHandle;
mod acknowledgement_control;
@@ -91,6 +92,7 @@ impl RealMessagesController<OsRng> {
mix_sender: BatchMixMessageSender,
topology_access: TopologyAccessor,
reply_key_storage: ReplyKeyStorage,
shutdown: ShutdownListener,
) -> Self {
let rng = OsRng;
@@ -119,6 +121,7 @@ impl RealMessagesController<OsRng> {
config.self_recipient,
reply_key_storage,
ack_controller_connectors,
shutdown.clone(),
);
let out_queue_config = real_traffic_stream::Config::new(
@@ -136,6 +139,7 @@ impl RealMessagesController<OsRng> {
rng,
config.self_recipient,
topology_access,
shutdown,
);
RealMessagesController {
@@ -153,12 +157,12 @@ impl RealMessagesController<OsRng> {
// graceful shutdowns.
let out_queue_control_fut = tokio::spawn(async move {
out_queue_control.run_out_queue_control().await;
error!("The out queue controller has finished execution!");
debug!("The out queue controller has finished execution!");
out_queue_control
});
let ack_control_fut = tokio::spawn(async move {
ack_control.run().await;
error!("The acknowledgement controller has finished execution!");
debug!("The acknowledgement controller has finished execution!");
ack_control
});
@@ -19,6 +19,7 @@ use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use task::ShutdownListener;
use tokio::time;
/// Configurable parameters of the `OutQueueControl`
@@ -83,6 +84,9 @@ where
/// Buffer containing all real messages received. It is first exhausted before more are pulled.
received_buffer: VecDeque<RealMessage>,
/// Listens for shutdown signals
shutdown: ShutdownListener,
}
pub(crate) struct RealMessage {
@@ -174,6 +178,7 @@ where
rng: R,
our_full_destination: Recipient,
topology_access: TopologyAccessor,
shutdown: ShutdownListener,
) -> Self {
OutQueueControl {
config,
@@ -186,6 +191,7 @@ where
rng,
topology_access,
received_buffer: VecDeque::with_capacity(0), // we won't be putting any data into this guy directly
shutdown,
}
}
@@ -239,7 +245,15 @@ where
// - we run out of memory
// - the receiver channel is closed
// in either case there's no recovery and we can only panic
self.mix_tx.unbounded_send(vec![next_message]).unwrap();
if let Err(err) = self.mix_tx.unbounded_send(vec![next_message]) {
if self.shutdown.is_shutdown_poll() {
log::info!("Failed to send (shutdown detected)");
} else {
// We don't try to limp along, panic to avoid continuing in a potentially
// inconsistent state
panic!("{err}");
}
}
// JS: Not entirely sure why or how it fixes stuff, but without the yield call,
// the UnboundedReceiver [of mix_rx] will not get a chance to read anything
@@ -257,9 +271,19 @@ where
self.config.average_message_sending_delay,
)));
while let Some(next_message) = self.next().await {
self.on_message(next_message).await;
let mut shutdown = self.shutdown.clone();
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown.recv() => {
log::trace!("OutQueueControl: Received shutdown");
}
Some(next_message) = self.next() => {
self.on_message(next_message).await;
},
}
}
log::debug!("OutQueueControl: Exiting");
}
pub(crate) async fn run_out_queue_control(&mut self) {
@@ -15,6 +15,7 @@ use nymsphinx::params::{ReplySurbEncryptionAlgorithm, ReplySurbKeyDigestAlgorith
use nymsphinx::receiver::{MessageReceiver, MessageRecoveryError, ReconstructedMessage};
use std::collections::HashSet;
use std::sync::Arc;
use task::ShutdownListener;
use tokio::task::JoinHandle;
// Buffer Requests to say "hey, send any reconstructed messages to this channel"
@@ -292,16 +293,26 @@ impl RequestReceiver {
fn start(mut self) -> JoinHandle<()> {
tokio::spawn(async move {
while let Some(request) = self.query_receiver.next().await {
match request {
ReceivedBufferMessage::ReceiverAnnounce(sender) => {
self.received_buffer.connect_sender(sender).await;
}
ReceivedBufferMessage::ReceiverDisconnect => {
self.received_buffer.disconnect_sender().await
}
}
loop {
tokio::select! {
request = self.query_receiver.next() => {
match request {
Some(ReceivedBufferMessage::ReceiverAnnounce(sender)) => {
self.received_buffer.connect_sender(sender).await;
}
Some(ReceivedBufferMessage::ReceiverDisconnect) => {
self.received_buffer.disconnect_sender().await
}
None => {
log::trace!("RequestReceiver: Stopping since channel closed");
break;
},
}
},
};
}
log::debug!("RequestReceiver: Exiting");
})
}
}
@@ -309,23 +320,40 @@ impl RequestReceiver {
struct FragmentedMessageReceiver {
received_buffer: ReceivedMessagesBuffer,
mixnet_packet_receiver: MixnetMessageReceiver,
shutdown: ShutdownListener,
}
impl FragmentedMessageReceiver {
fn new(
received_buffer: ReceivedMessagesBuffer,
mixnet_packet_receiver: MixnetMessageReceiver,
shutdown: ShutdownListener,
) -> Self {
FragmentedMessageReceiver {
received_buffer,
mixnet_packet_receiver,
shutdown,
}
}
fn start(mut self) -> JoinHandle<()> {
tokio::spawn(async move {
while let Some(new_messages) = self.mixnet_packet_receiver.next().await {
self.received_buffer.handle_new_received(new_messages).await;
while !self.shutdown.is_shutdown() {
tokio::select! {
new_messages = self.mixnet_packet_receiver.next() => match new_messages {
Some(new_messages) => {
self.received_buffer.handle_new_received(new_messages).await;
}
None => {
log::trace!("FragmentedMessageReceiver: Stopping since channel closed");
break;
}
},
_ = self.shutdown.recv() => {
log::trace!("FragmentedMessageReceiver: Received shutdown");
}
}
}
log::debug!("FragmentedMessageReceiver: Exiting");
})
}
}
@@ -341,6 +369,7 @@ impl ReceivedMessagesBufferController {
query_receiver: ReceivedBufferRequestReceiver,
mixnet_packet_receiver: MixnetMessageReceiver,
reply_key_storage: ReplyKeyStorage,
shutdown: ShutdownListener,
) -> Self {
let received_buffer =
ReceivedMessagesBuffer::new(local_encryption_keypair, reply_key_storage);
@@ -349,6 +378,7 @@ impl ReceivedMessagesBufferController {
fragmented_message_receiver: FragmentedMessageReceiver::new(
received_buffer.clone(),
mixnet_packet_receiver,
shutdown,
),
request_receiver: RequestReceiver::new(received_buffer, query_receiver),
}
@@ -10,6 +10,7 @@ use std::ops::Deref;
use std::sync::Arc;
use std::time;
use std::time::Duration;
use task::ShutdownListener;
use tokio::sync::{RwLock, RwLockReadGuard};
use tokio::task::JoinHandle;
use topology::{nym_topology_from_bonds, NymTopology};
@@ -303,12 +304,21 @@ impl TopologyRefresher {
self.topology_accessor.is_routable().await
}
pub fn start(mut self) -> JoinHandle<()> {
pub fn start(mut self, mut shutdown: ShutdownListener) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
tokio::time::sleep(self.refresh_rate).await;
self.refresh().await;
while !shutdown.is_shutdown() {
tokio::select! {
_ = tokio::time::sleep(self.refresh_rate) => {
self.refresh().await;
},
_ = shutdown.recv() => {
log::trace!("TopologyRefresher: Received shutdown");
},
}
}
let a = SHUTDOWN_IS_SIGNALED;
let a = crate::client::SHUTDOWN_IS_SIGNALLED.load(std::sync::atomic::Ordering::Relaxed);
log::debug!("TopologyRefresher: Exiting");
})
}
}
+5 -4
View File
@@ -33,19 +33,20 @@ tokio-tungstenite = "0.14" # websocket
## internal
client-core = { path = "../client-core" }
coconut-interface = { path = "../../common/coconut-interface", optional = true }
credentials = { path = "../../common/credentials", optional = true }
credential-storage = { path = "../../common/credential-storage" }
config = { path = "../../common/config" }
credential-storage = { path = "../../common/credential-storage" }
credentials = { path = "../../common/credentials", optional = true }
crypto = { path = "../../common/crypto" }
gateway-client = { path = "../../common/client-libs/gateway-client" }
gateway-requests = { path = "../../gateway/gateway-requests" }
network-defaults = { path = "../../common/network-defaults" }
nymsphinx = { path = "../../common/nymsphinx" }
pemstore = { path = "../../common/pemstore" }
task = { path = "../../common/task" }
topology = { path = "../../common/topology" }
websocket-requests = { path = "websocket-requests" }
validator-client = { path = "../../common/client-libs/validator-client", features = ["nymd-client"] }
version-checker = { path = "../../common/version-checker" }
network-defaults = { path = "../../common/network-defaults" }
websocket-requests = { path = "websocket-requests" }
[features]
coconut = ["coconut-interface", "credentials", "credentials/coconut", "gateway-requests/coconut", "gateway-client/coconut", "client-core/coconut"]
+51 -16
View File
@@ -32,6 +32,7 @@ use nymsphinx::addressing::clients::Recipient;
use nymsphinx::addressing::nodes::NodeIdentity;
use nymsphinx::anonymous_replies::ReplySurb;
use nymsphinx::receiver::ReconstructedMessage;
use task::{wait_for_signal, ShutdownListener, ShutdownNotifier};
use crate::client::config::{Config, SocketType};
use crate::websocket;
@@ -85,6 +86,7 @@ impl NymClient {
&self,
topology_accessor: TopologyAccessor,
mix_tx: BatchMixMessageSender,
shutdown: ShutdownListener,
) {
info!("Starting loop cover traffic stream...");
@@ -98,6 +100,7 @@ impl NymClient {
mix_tx,
self.as_mix_recipient(),
topology_accessor,
shutdown,
)
.start();
}
@@ -109,6 +112,7 @@ impl NymClient {
ack_receiver: AcknowledgementReceiver,
input_receiver: InputMessageReceiver,
mix_sender: BatchMixMessageSender,
shutdown: ShutdownListener,
) {
let controller_config = real_messages_control::Config::new(
self.key_manager.ack_key(),
@@ -129,6 +133,7 @@ impl NymClient {
mix_sender,
topology_accessor,
reply_key_storage,
shutdown,
)
.start();
}
@@ -140,6 +145,7 @@ impl NymClient {
query_receiver: ReceivedBufferRequestReceiver,
mixnet_receiver: MixnetMessageReceiver,
reply_key_storage: ReplyKeyStorage,
shutdown: ShutdownListener,
) {
info!("Starting received messages buffer controller...");
ReceivedMessagesBufferController::new(
@@ -147,6 +153,7 @@ impl NymClient {
query_receiver,
mixnet_receiver,
reply_key_storage,
shutdown,
)
.start()
}
@@ -155,6 +162,7 @@ impl NymClient {
&mut self,
mixnet_message_sender: MixnetMessageSender,
ack_sender: AcknowledgementSender,
shutdown: ShutdownListener,
) -> GatewayClient {
let gateway_id = self.config.get_base().get_gateway_id();
if gateway_id.is_empty() {
@@ -197,6 +205,7 @@ impl NymClient {
ack_sender,
self.config.get_base().get_gateway_response_timeout(),
Some(bandwidth_controller),
Some(shutdown),
);
gateway_client
@@ -212,7 +221,11 @@ impl NymClient {
// future responsible for periodically polling directory server and updating
// the current global view of topology
async fn start_topology_refresher(&mut self, topology_accessor: TopologyAccessor) {
async fn start_topology_refresher(
&mut self,
topology_accessor: TopologyAccessor,
shutdown: ShutdownListener,
) {
let topology_refresher_config = TopologyRefresherConfig::new(
self.config.get_base().get_validator_api_endpoints(),
self.config.get_base().get_topology_refresh_rate(),
@@ -234,7 +247,7 @@ impl NymClient {
}
info!("Starting topology refresher...");
topology_refresher.start();
topology_refresher.start(shutdown);
}
// controller for sending sphinx packets to mixnet (either real traffic or cover traffic)
@@ -245,9 +258,10 @@ impl NymClient {
&mut self,
mix_rx: BatchMixMessageReceiver,
gateway_client: GatewayClient,
shutdown: ShutdownListener,
) {
info!("Starting mix traffic controller...");
MixTrafficController::new(mix_rx, gateway_client).start();
MixTrafficController::new(mix_rx, gateway_client, shutdown).start();
}
fn start_websocket_listener(
@@ -308,20 +322,26 @@ impl NymClient {
/// blocking version of `start` method. Will run forever (or until SIGINT is sent)
pub async fn run_forever(&mut self) {
self.start().await;
if let Err(e) = tokio::signal::ctrl_c().await {
error!(
"There was an error while capturing SIGINT - {:?}. We will terminate regardless",
e
);
}
let shutdown = self.start().await;
wait_for_signal().await;
println!(
"Received SIGINT - the client will terminate now (threads are not yet nicely stopped, if you see stack traces that's alright)."
"Received signal - the client will terminate now (threads are not yet nicely stopped, if you see stack traces that's alright)."
);
log::info!("Sending shutdown");
shutdown.signal_shutdown().ok();
// Some of these components have shutdown signalling implemented as part of socks5 work,
// but since it's not fully implemented (yet) for all the components of the native client,
// we don't try to wait and instead just stop immediately.
//log::info!("Waiting for tasks to finish... (Press ctrl-c to force)");
//shutdown.wait_for_shutdown().await;
log::info!("Stopping nym-client");
}
pub async fn start(&mut self) {
pub async fn start(&mut self) -> ShutdownNotifier {
info!("Starting nym client");
// channels for inter-component communication
// TODO: make the channels be internally created by the relevant components
@@ -351,30 +371,43 @@ impl NymClient {
ReplyKeyStorage::load(self.config.get_base().get_reply_encryption_key_store_path())
.expect("Failed to load reply key storage!");
// Shutdown notifier for signalling tasks to stop
let shutdown = ShutdownNotifier::default();
// the components are started in very specific order. Unless you know what you are doing,
// do not change that.
self.start_topology_refresher(shared_topology_accessor.clone())
self.start_topology_refresher(shared_topology_accessor.clone(), shutdown.subscribe())
.await;
self.start_received_messages_buffer_controller(
received_buffer_request_receiver,
mixnet_messages_receiver,
reply_key_storage.clone(),
shutdown.subscribe(),
);
let gateway_client = self
.start_gateway_client(mixnet_messages_sender, ack_sender)
.start_gateway_client(mixnet_messages_sender, ack_sender, shutdown.subscribe())
.await;
self.start_mix_traffic_controller(sphinx_message_receiver, gateway_client);
self.start_mix_traffic_controller(
sphinx_message_receiver,
gateway_client,
shutdown.subscribe(),
);
self.start_real_traffic_controller(
shared_topology_accessor.clone(),
reply_key_storage,
ack_receiver,
input_receiver,
sphinx_message_sender.clone(),
shutdown.subscribe(),
);
self.start_cover_traffic_stream(shared_topology_accessor, sphinx_message_sender);
self.start_cover_traffic_stream(
shared_topology_accessor,
sphinx_message_sender,
shutdown.subscribe(),
);
match self.config.get_socket_type() {
SocketType::WebSocket => {
@@ -399,5 +432,7 @@ impl NymClient {
info!("Client startup finished!");
info!("The address of this client is: {}", self.as_mix_recipient());
shutdown
}
}
+6 -5
View File
@@ -26,21 +26,22 @@ url = "2.2"
# internal
client-core = { path = "../client-core" }
coconut-interface = { path = "../../common/coconut-interface", optional = true }
credentials = { path = "../../common/credentials", optional = true }
credential-storage = { path = "../../common/credential-storage" }
config = { path = "../../common/config" }
credential-storage = { path = "../../common/credential-storage" }
credentials = { path = "../../common/credentials", optional = true }
crypto = { path = "../../common/crypto" }
gateway-client = { path = "../../common/client-libs/gateway-client" }
gateway-requests = { path = "../../gateway/gateway-requests" }
network-defaults = { path = "../../common/network-defaults" }
nymsphinx = { path = "../../common/nymsphinx" }
ordered-buffer = { path = "../../common/socks5/ordered-buffer" }
socks5-requests = { path = "../../common/socks5/requests" }
topology = { path = "../../common/topology" }
pemstore = { path = "../../common/pemstore" }
proxy-helpers = { path = "../../common/socks5/proxy-helpers" }
socks5-requests = { path = "../../common/socks5/requests" }
task = { path = "../../common/task" }
topology = { path = "../../common/topology" }
validator-client = { path = "../../common/client-libs/validator-client", features = ["nymd-client"] }
version-checker = { path = "../../common/version-checker" }
network-defaults = { path = "../../common/network-defaults" }
[features]
coconut = ["coconut-interface", "credentials", "gateway-requests/coconut", "gateway-client/coconut", "credentials/coconut", "client-core/coconut"]
+72 -22
View File
@@ -1,6 +1,8 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::sync::atomic::AtomicBool;
use client_core::client::cover_traffic_stream::LoopCoverTrafficStream;
use client_core::client::inbound_messages::{
InputMessage, InputMessageReceiver, InputMessageSender,
@@ -29,6 +31,7 @@ use gateway_client::{
use log::*;
use nymsphinx::addressing::clients::Recipient;
use nymsphinx::addressing::nodes::NodeIdentity;
use task::{wait_for_signal, ShutdownListener, ShutdownNotifier};
use crate::client::config::Config;
use crate::socks::{
@@ -38,6 +41,8 @@ use crate::socks::{
pub mod config;
pub static SHUTDOWN_IS_SIGNALLED: AtomicBool = AtomicBool::new(false);
// Channels used to control the main task from outside
pub type Socks5ControlMessageSender = mpsc::UnboundedSender<Socks5ControlMessage>;
pub type Socks5ControlMessageReceiver = mpsc::UnboundedReceiver<Socks5ControlMessage>;
@@ -84,6 +89,7 @@ impl NymClient {
&self,
topology_accessor: TopologyAccessor,
mix_tx: BatchMixMessageSender,
shutdown: ShutdownListener,
) {
info!("Starting loop cover traffic stream...");
@@ -97,6 +103,7 @@ impl NymClient {
mix_tx,
self.as_mix_recipient(),
topology_accessor,
shutdown,
)
.start();
}
@@ -108,6 +115,7 @@ impl NymClient {
ack_receiver: AcknowledgementReceiver,
input_receiver: InputMessageReceiver,
mix_sender: BatchMixMessageSender,
shutdown: ShutdownListener,
) {
let controller_config = client_core::client::real_messages_control::Config::new(
self.key_manager.ack_key(),
@@ -128,6 +136,7 @@ impl NymClient {
mix_sender,
topology_accessor,
reply_key_storage,
shutdown,
)
.start();
}
@@ -139,6 +148,7 @@ impl NymClient {
query_receiver: ReceivedBufferRequestReceiver,
mixnet_receiver: MixnetMessageReceiver,
reply_key_storage: ReplyKeyStorage,
shutdown: ShutdownListener,
) {
info!("Starting received messages buffer controller...");
ReceivedMessagesBufferController::new(
@@ -146,6 +156,7 @@ impl NymClient {
query_receiver,
mixnet_receiver,
reply_key_storage,
shutdown,
)
.start()
}
@@ -154,6 +165,7 @@ impl NymClient {
&mut self,
mixnet_message_sender: MixnetMessageSender,
ack_sender: AcknowledgementSender,
shutdown: ShutdownListener,
) -> GatewayClient {
let gateway_id = self.config.get_base().get_gateway_id();
if gateway_id.is_empty() {
@@ -196,6 +208,7 @@ impl NymClient {
ack_sender,
self.config.get_base().get_gateway_response_timeout(),
Some(bandwidth_controller),
Some(shutdown),
);
gateway_client
@@ -211,7 +224,11 @@ impl NymClient {
// future responsible for periodically polling directory server and updating
// the current global view of topology
async fn start_topology_refresher(&mut self, topology_accessor: TopologyAccessor) {
async fn start_topology_refresher(
&mut self,
topology_accessor: TopologyAccessor,
shutdown: ShutdownListener,
) {
let topology_refresher_config = TopologyRefresherConfig::new(
self.config.get_base().get_validator_api_endpoints(),
self.config.get_base().get_topology_refresh_rate(),
@@ -233,7 +250,7 @@ impl NymClient {
}
info!("Starting topology refresher...");
topology_refresher.start();
topology_refresher.start(shutdown);
}
// controller for sending sphinx packets to mixnet (either real traffic or cover traffic)
@@ -244,15 +261,17 @@ impl NymClient {
&mut self,
mix_rx: BatchMixMessageReceiver,
gateway_client: GatewayClient,
shutdown: ShutdownListener,
) {
info!("Starting mix traffic controller...");
MixTrafficController::new(mix_rx, gateway_client).start();
MixTrafficController::new(mix_rx, gateway_client, shutdown).start();
}
fn start_socks5_listener(
&self,
buffer_requester: ReceivedBufferRequestSender,
msg_input: InputMessageSender,
shutdown: ShutdownListener,
) {
info!("Starting socks5 listener...");
let auth_methods = vec![AuthenticationMethods::NoAuth as u8];
@@ -264,43 +283,55 @@ impl NymClient {
authenticator,
self.config.get_provider_mix_address(),
self.as_mix_recipient(),
shutdown,
);
tokio::spawn(async move { sphinx_socks.serve(msg_input, buffer_requester).await });
}
/// blocking version of `start` method. Will run forever (or until SIGINT is sent)
pub async fn run_forever(&mut self) {
self.start().await;
if let Err(e) = tokio::signal::ctrl_c().await {
error!(
"There was an error while capturing SIGINT - {:?}. We will terminate regardless",
e
);
}
let mut shutdown = self.start().await;
wait_for_signal().await;
println!(
"Received SIGINT - the client will terminate now (threads are not yet nicely stopped, if you see stack traces that's alright)."
);
log::info!("Sending shutdown");
SHUTDOWN_IS_SIGNALLED.store(true, std::sync::atomic::Ordering::Relaxed);
client_core::client::SHUTDOWN_IS_SIGNALLED.store(true, std::sync::atomic::Ordering::Relaxed);
shutdown.signal_shutdown().ok();
log::info!("Waiting for tasks to finish... (Press ctrl-c to force)");
shutdown.wait_for_shutdown().await;
log::info!("Stopping nym-socks5-client");
}
// Variant of `run_forever` that listends for remote control messages
pub async fn run_and_listen(&mut self, mut receiver: Socks5ControlMessageReceiver) {
self.start().await;
let mut shutdown = self.start().await;
tokio::select! {
message = receiver.next() => {
log::debug!("Received message: {:?}", message);
match message {
Some(Socks5ControlMessage::Stop) => {
log::info!("Shutting down");
log::info!("Graceful shutdown of tasks not yet implemented, you might see (harmless) panics until then");
log::info!("Received stop message");
}
None => log::debug!("None"),
}
}
_ = tokio::signal::ctrl_c() => {
log::info!("Received SIGINT");
},
}
log::info!("Sending shutdown");
shutdown.signal_shutdown().ok();
log::info!("Waiting for tasks to finish... (Press ctrl-c to force)");
shutdown.wait_for_shutdown().await;
log::info!("Stopping nym-socks5-client");
}
pub async fn start(&mut self) {
pub async fn start(&mut self) -> ShutdownNotifier {
info!("Starting nym client");
// channels for inter-component communication
// TODO: make the channels be internally created by the relevant components
@@ -330,33 +361,52 @@ impl NymClient {
ReplyKeyStorage::load(self.config.get_base().get_reply_encryption_key_store_path())
.expect("Failed to load reply key storage!");
// Shutdown notifier for signalling tasks to stop
let shutdown = ShutdownNotifier::default();
// the components are started in very specific order. Unless you know what you are doing,
// do not change that.
self.start_topology_refresher(shared_topology_accessor.clone())
self.start_topology_refresher(shared_topology_accessor.clone(), shutdown.subscribe())
.await;
self.start_received_messages_buffer_controller(
received_buffer_request_receiver,
mixnet_messages_receiver,
reply_key_storage.clone(),
shutdown.subscribe(),
);
let gateway_client = self
.start_gateway_client(mixnet_messages_sender, ack_sender)
.start_gateway_client(mixnet_messages_sender, ack_sender, shutdown.subscribe())
.await;
self.start_mix_traffic_controller(sphinx_message_receiver, gateway_client);
self.start_mix_traffic_controller(
sphinx_message_receiver,
gateway_client,
shutdown.subscribe(),
);
self.start_real_traffic_controller(
shared_topology_accessor.clone(),
reply_key_storage,
ack_receiver,
input_receiver,
sphinx_message_sender.clone(),
shutdown.subscribe(),
);
self.start_cover_traffic_stream(shared_topology_accessor, sphinx_message_sender);
self.start_socks5_listener(received_buffer_request_sender, input_sender);
self.start_cover_traffic_stream(
shared_topology_accessor,
sphinx_message_sender,
shutdown.subscribe(),
);
self.start_socks5_listener(
received_buffer_request_sender,
input_sender,
shutdown.subscribe(),
);
info!("Client startup finished!");
info!("The address of this client is: {}", self.as_mix_recipient());
shutdown
}
}
+5
View File
@@ -20,6 +20,7 @@ use socks5_requests::{ConnectionId, Message, RemoteAddress, Request};
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use task::ShutdownListener;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio::{self, net::TcpStream};
@@ -140,6 +141,7 @@ pub(crate) struct SocksClient {
service_provider: Recipient,
self_address: Recipient,
started_proxy: bool,
shutdown_listener: ShutdownListener,
}
impl Drop for SocksClient {
@@ -163,6 +165,7 @@ impl SocksClient {
service_provider: Recipient,
controller_sender: ControllerSender,
self_address: Recipient,
shutdown_listener: ShutdownListener,
) -> Self {
let connection_id = Self::generate_random();
SocksClient {
@@ -176,6 +179,7 @@ impl SocksClient {
service_provider,
self_address,
started_proxy: false,
shutdown_listener,
}
}
@@ -250,6 +254,7 @@ impl SocksClient {
conn_receiver,
input_sender,
connection_id,
self.shutdown_listener.clone(),
)
.run(move |conn_id, read_data, socket_closed| {
let provider_request = Request::new_send(conn_id, read_data, socket_closed);
+16 -4
View File
@@ -6,11 +6,13 @@ use log::*;
use nymsphinx::receiver::ReconstructedMessage;
use proxy_helpers::connection_controller::{ControllerCommand, ControllerSender};
use socks5_requests::Message;
use task::ShutdownListener;
pub(crate) struct MixnetResponseListener {
buffer_requester: ReceivedBufferRequestSender,
mix_response_receiver: ReconstructedMessagesReceiver,
controller_sender: ControllerSender,
shutdown: ShutdownListener,
}
impl Drop for MixnetResponseListener {
@@ -25,6 +27,7 @@ impl MixnetResponseListener {
pub(crate) fn new(
buffer_requester: ReceivedBufferRequestSender,
controller_sender: ControllerSender,
shutdown: ShutdownListener,
) -> Self {
let (mix_response_sender, mix_response_receiver) = mpsc::unbounded();
buffer_requester
@@ -35,6 +38,7 @@ impl MixnetResponseListener {
buffer_requester,
mix_response_receiver,
controller_sender,
shutdown,
}
}
@@ -73,11 +77,19 @@ impl MixnetResponseListener {
}
pub(crate) async fn run(&mut self) {
while let Some(received_responses) = self.mix_response_receiver.next().await {
for reconstructed_message in received_responses {
self.on_message(reconstructed_message).await;
while !self.shutdown.is_shutdown() {
tokio::select! {
Some(received_responses) = self.mix_response_receiver.next() => {
for reconstructed_message in received_responses {
self.on_message(reconstructed_message).await;
}
},
_ = self.shutdown.recv() => {
log::trace!("MixnetResponseListener: Received shutdown");
}
}
}
error!("We should never see this message");
let a = crate::client::SHUTDOWN_IS_SIGNALLED.load(std::sync::atomic::Ordering::Relaxed);
log::debug!("MixnetResponseListener: Exiting");
}
}
+55 -41
View File
@@ -11,6 +11,7 @@ use log::*;
use nymsphinx::addressing::clients::Recipient;
use proxy_helpers::connection_controller::Controller;
use std::net::SocketAddr;
use task::ShutdownListener;
use tokio::net::TcpListener;
/// A Socks5 server that listens for connections.
@@ -19,6 +20,7 @@ pub struct SphinxSocksServer {
listening_address: SocketAddr,
service_provider: Recipient,
self_address: Recipient,
shutdown: ShutdownListener,
}
impl SphinxSocksServer {
@@ -28,6 +30,7 @@ impl SphinxSocksServer {
authenticator: Authenticator,
service_provider: Recipient,
self_address: Recipient,
shutdown: ShutdownListener,
) -> Self {
// hardcode ip as we (presumably) ONLY want to listen locally. If we change it, we can
// just modify the config
@@ -38,6 +41,7 @@ impl SphinxSocksServer {
listening_address: format!("{}:{}", ip, port).parse().unwrap(),
service_provider,
self_address,
shutdown,
}
}
@@ -58,56 +62,66 @@ impl SphinxSocksServer {
});
// listener for mix messages
let mut mixnet_response_listener =
MixnetResponseListener::new(buffer_requester, controller_sender.clone());
let mut mixnet_response_listener = MixnetResponseListener::new(
buffer_requester,
controller_sender.clone(),
self.shutdown.clone(),
);
tokio::spawn(async move {
mixnet_response_listener.run().await;
});
loop {
if let Ok((stream, _remote)) = listener.accept().await {
// TODO Optimize this
let mut client = SocksClient::new(
stream,
self.authenticator.clone(),
input_sender.clone(),
self.service_provider,
controller_sender.clone(),
self.self_address,
);
tokio::select! {
Ok((stream, _remote)) = listener.accept() => {
// TODO Optimize this
let mut client = SocksClient::new(
stream,
self.authenticator.clone(),
input_sender.clone(),
self.service_provider,
controller_sender.clone(),
self.self_address,
self.shutdown.clone(),
);
tokio::spawn(async move {
{
match client.run().await {
Ok(_) => {}
Err(error) => {
error!("Error! {}", error);
let error_text = format!("{}", error);
tokio::spawn(async move {
{
match client.run().await {
Ok(_) => {}
Err(error) => {
error!("Error! {}", error);
let error_text = format!("{}", error);
let response: ResponseCode;
let response: ResponseCode;
if error_text.contains("Host") {
response = ResponseCode::HostUnreachable;
} else if error_text.contains("Network") {
response = ResponseCode::NetworkUnreachable;
} else if error_text.contains("ttl") {
response = ResponseCode::TtlExpired
} else {
response = ResponseCode::Failure
if error_text.contains("Host") {
response = ResponseCode::HostUnreachable;
} else if error_text.contains("Network") {
response = ResponseCode::NetworkUnreachable;
} else if error_text.contains("ttl") {
response = ResponseCode::TtlExpired
} else {
response = ResponseCode::Failure
}
if client.error(response).await.is_err() {
warn!("Failed to send error code");
};
if client.shutdown().await.is_err() {
warn!("Failed to shutdown TcpStream");
};
}
if client.error(response).await.is_err() {
warn!("Failed to send error code");
};
if client.shutdown().await.is_err() {
warn!("Failed to shutdown TcpStream");
};
}
};
// client gets dropped here
}
});
};
// client gets dropped here
}
});
},
_ = self.shutdown.recv() => {
log::trace!("SphinxSocksServer: Received shutdown");
log::debug!("SphinxSocksServer: Exiting");
return Ok(());
}
}
}
}
+7 -2
View File
@@ -20,13 +20,14 @@ web3 = { version = "0.17.0", default-features = false }
async-trait = { version = "0.1.51" }
# internal
coconut-interface = { path = "../../coconut-interface", optional = true }
credentials = { path = "../../credentials" }
crypto = { path = "../../crypto" }
gateway-requests = { path = "../../../gateway/gateway-requests" }
network-defaults = { path = "../../network-defaults" }
nymsphinx = { path = "../../nymsphinx" }
pemstore = { path = "../../pemstore" }
coconut-interface = { path = "../../coconut-interface", optional = true }
network-defaults = { path = "../../network-defaults" }
task = { path = "../../task" }
validator-client = { path = "../validator-client", optional = true }
[dependencies.tungstenite]
@@ -38,6 +39,10 @@ default-features = false
version = "1.19.1"
features = ["macros", "rt", "net", "sync", "time"]
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-stream]
version = "0.1.9"
features = ["net", "sync", "time"]
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-tungstenite]
version = "0.14"
@@ -30,6 +30,7 @@ use rand::rngs::OsRng;
use std::convert::TryFrom;
use std::sync::Arc;
use std::time::Duration;
use task::ShutdownListener;
use tungstenite::protocol::Message;
#[cfg(not(target_arch = "wasm32"))]
@@ -65,6 +66,10 @@ pub struct GatewayClient {
reconnection_attempts: usize,
/// Delay between each subsequent reconnection attempt.
reconnection_backoff: Duration,
/// Listen to shutdown messages. This is an option since we don't require it when for example
/// when doing initial authentication.
shutdown: Option<ShutdownListener>,
}
impl GatewayClient {
@@ -80,6 +85,7 @@ impl GatewayClient {
ack_sender: AcknowledgementSender,
response_timeout_duration: Duration,
bandwidth_controller: Option<BandwidthController<PersistentStorage>>,
shutdown: Option<ShutdownListener>,
) -> Self {
GatewayClient {
authenticated: false,
@@ -97,6 +103,7 @@ impl GatewayClient {
should_reconnect_on_failure: true,
reconnection_attempts: DEFAULT_RECONNECTION_ATTEMPTS,
reconnection_backoff: DEFAULT_RECONNECTION_BACKOFF,
shutdown,
}
}
@@ -148,6 +155,7 @@ impl GatewayClient {
should_reconnect_on_failure: false,
reconnection_attempts: DEFAULT_RECONNECTION_ATTEMPTS,
reconnection_backoff: DEFAULT_RECONNECTION_BACKOFF,
shutdown: None,
}
}
@@ -298,7 +306,7 @@ impl GatewayClient {
}
_ => (),
}
}
}
}
}
}
@@ -723,6 +731,7 @@ impl GatewayClient {
.as_ref()
.expect("no shared key present even though we're authenticated!"),
),
self.shutdown.clone(),
)
}
_ => unreachable!(),
@@ -11,6 +11,7 @@ use gateway_requests::registration::handshake::SharedKeys;
use gateway_requests::BinaryResponse;
use log::*;
use std::sync::Arc;
use task::ShutdownListener;
use tungstenite::Message;
#[cfg(not(target_arch = "wasm32"))]
@@ -86,6 +87,7 @@ impl PartiallyDelegated {
conn: WsConn,
packet_router: PacketRouter,
shared_key: Arc<SharedKeys>,
shutdown: Option<ShutdownListener>,
) -> Self {
// when called for, it NEEDS TO yield back the stream so that we could merge it and
// read control request responses.
@@ -98,9 +100,15 @@ impl PartiallyDelegated {
let mut fused_receiver = notify_receiver.fuse();
let mut fused_stream = (&mut stream).fuse();
// Bit of an ugly workaround for selecting on an `Option`. The unwrap is lazy so we use
// this bool inside the `select` to guard against unwrapping in the `None` case.
let shutdown_is_some = shutdown.is_some();
let shutdown_recv_lazy = async { shutdown.unwrap().recv().await };
tokio::pin!(shutdown_recv_lazy);
let ret_err = loop {
futures::select! {
_ = fused_receiver => {
tokio::select! {
_ = &mut fused_receiver => {
break Ok(());
}
msg = fused_stream.next() => {
@@ -110,6 +118,11 @@ impl PartiallyDelegated {
};
Self::route_socket_message(ws_msg, &packet_router, shared_key.as_ref());
}
_ = &mut shutdown_recv_lazy, if shutdown_is_some => {
log::trace!("GatewayClient listener: Received shutdown");
log::debug!("GatewayClient listener: Exiting");
return;
}
};
};
+1
View File
@@ -16,6 +16,7 @@ futures = "0.3"
log = "0.4"
socks5-requests = { path = "../requests" }
ordered-buffer = { path = "../ordered-buffer" }
task = { path = "../../task" }
[dev-dependencies]
tokio-test = "0.4.2"
@@ -170,16 +170,23 @@ impl Controller {
}
pub async fn run(&mut self) {
while let Some(command) = self.receiver.next().await {
match command {
ControllerCommand::Send(conn_id, data, is_closed) => {
self.send_to_connection(conn_id, data, is_closed)
loop {
tokio::select! {
command = self.receiver.next() => match command {
Some(ControllerCommand::Send(conn_id, data, is_closed)) => {
self.send_to_connection(conn_id, data, is_closed)
}
Some(ControllerCommand::Insert(conn_id, sender)) => {
self.insert_connection(conn_id, sender)
}
Some(ControllerCommand::Remove(conn_id)) => self.remove_connection(conn_id),
None => {
log::trace!("SOCKS5 Controller: Stopping since channel closed");
break;
}
}
ControllerCommand::Insert(conn_id, sender) => {
self.insert_connection(conn_id, sender)
}
ControllerCommand::Remove(conn_id) => self.remove_connection(conn_id),
}
}
log::debug!("SOCKS5 Controller: Exiting");
}
}
@@ -11,6 +11,7 @@ use log::*;
use ordered_buffer::OrderedMessageSender;
use socks5_requests::ConnectionId;
use std::{io, sync::Arc};
use task::ShutdownListener;
use tokio::select;
use tokio::{net::tcp::OwnedReadHalf, sync::Notify, time::sleep};
@@ -74,6 +75,7 @@ where
is_finished
}
#[allow(clippy::too_many_arguments)]
pub(super) async fn run_inbound<F, S>(
mut reader: OwnedReadHalf,
local_destination_address: String, // addresses are provided for better logging
@@ -82,6 +84,7 @@ pub(super) async fn run_inbound<F, S>(
mix_sender: MixProxySender<S>,
adapter_fn: F,
shutdown_notify: Arc<Notify>,
mut shutdown_listener: ShutdownListener,
) -> OwnedReadHalf
where
F: Fn(ConnectionId, Vec<u8>, bool) -> S + Send + 'static,
@@ -106,6 +109,10 @@ where
send_empty_close(connection_id, &mut message_sender, &mix_sender, &adapter_fn);
break;
}
_ = shutdown_listener.recv() => {
log::trace!("ProxyRunner inbound: Received shutdown");
break;
}
}
}
trace!("{} - inbound closed", connection_id);
@@ -5,6 +5,7 @@ use crate::connection_controller::ConnectionReceiver;
use futures::channel::mpsc;
use socks5_requests::ConnectionId;
use std::{sync::Arc, time::Duration};
use task::ShutdownListener;
use tokio::{net::TcpStream, sync::Notify};
mod inbound;
@@ -44,6 +45,9 @@ pub struct ProxyRunner<S> {
local_destination_address: String,
remote_source_address: String,
connection_id: ConnectionId,
// Listens to shutdown commands from higher up
shutdown_listener: ShutdownListener,
}
impl<S> ProxyRunner<S>
@@ -57,6 +61,7 @@ where
mix_receiver: ConnectionReceiver,
mix_sender: MixProxySender<S>,
connection_id: ConnectionId,
shutdown_listener: ShutdownListener,
) -> Self {
ProxyRunner {
mix_receiver: Some(mix_receiver),
@@ -65,6 +70,7 @@ where
local_destination_address,
remote_source_address,
connection_id,
shutdown_listener,
}
}
@@ -86,6 +92,7 @@ where
self.mix_sender.clone(),
adapter_fn,
Arc::clone(&shutdown_notify),
self.shutdown_listener.clone(),
);
let outbound_future = outbound::run_outbound(
@@ -95,6 +102,7 @@ where
self.mix_receiver.take().unwrap(),
self.connection_id,
shutdown_notify,
self.shutdown_listener.clone(),
);
// TODO: this shouldn't really have to spawn tasks inside "library" code, but
@@ -8,6 +8,7 @@ use futures::StreamExt;
use log::*;
use socks5_requests::ConnectionId;
use std::{sync::Arc, time::Duration};
use task::ShutdownListener;
use tokio::io::AsyncWriteExt;
use tokio::select;
use tokio::{net::tcp::OwnedWriteHalf, sync::Notify, time::sleep, time::Instant};
@@ -50,6 +51,7 @@ pub(super) async fn run_outbound(
mut mix_receiver: ConnectionReceiver,
connection_id: ConnectionId,
shutdown_notify: Arc<Notify>,
mut shutdown_listener: ShutdownListener,
) -> (OwnedWriteHalf, ConnectionReceiver) {
let shutdown_future = shutdown_notify.notified().then(|_| sleep(SHUTDOWN_TIMEOUT));
tokio::pin!(shutdown_future);
@@ -78,6 +80,10 @@ pub(super) async fn run_outbound(
debug!("closing outbound proxy after inbound was closed {:?} ago", SHUTDOWN_TIMEOUT);
break;
}
_ = shutdown_listener.recv() => {
log::trace!("ProxyRunner outbound: Received shutdown");
break;
}
}
}
+2
View File
@@ -2,5 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
pub mod shutdown;
pub mod signal;
pub use shutdown::{ShutdownListener, ShutdownNotifier};
pub use signal::wait_for_signal;
+19
View File
@@ -92,6 +92,25 @@ impl ShutdownListener {
let _ = self.notify.changed().await;
self.shutdown = true;
}
pub fn is_shutdown_poll(&mut self) -> bool {
if self.shutdown {
return true;
}
match self.notify.has_changed() {
Ok(has_changed) => {
if has_changed {
self.shutdown = true;
}
has_changed
}
Err(err) => {
log::debug!("Polling shutdown failed: {err}");
log::debug!("Assuming this means we should shutdown...");
true
}
}
}
}
#[cfg(test)]
+27
View File
@@ -0,0 +1,27 @@
#[cfg(unix)]
pub async fn wait_for_signal() {
use tokio::signal::unix::{signal, SignalKind};
let mut sigterm = signal(SignalKind::terminate()).expect("Failed to setup SIGTERM channel");
let mut sigquit = signal(SignalKind::quit()).expect("Failed to setup SIGQUIT channel");
tokio::select! {
_ = tokio::signal::ctrl_c() => {
log::info!("Received SIGINT");
},
_ = sigterm.recv() => {
log::info!("Received SIGTERM");
}
_ = sigquit.recv() => {
log::info!("Received SIGQUIT");
}
}
}
#[cfg(not(unix))]
pub async fn wait_for_signal() {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
log::info!("Received SIGINT");
},
}
}
+1 -29
View File
@@ -26,7 +26,7 @@ use rand::thread_rng;
use std::net::SocketAddr;
use std::process;
use std::sync::Arc;
use task::{ShutdownListener, ShutdownNotifier};
use task::{wait_for_signal, ShutdownListener, ShutdownNotifier};
use version_checker::parse_version;
mod http;
@@ -334,31 +334,3 @@ impl MixNode {
self.wait_for_interrupt(shutdown).await
}
}
#[cfg(unix)]
async fn wait_for_signal() {
use tokio::signal::unix::{signal, SignalKind};
let mut sigterm = signal(SignalKind::terminate()).expect("Failed to setup SIGTERM channel");
let mut sigquit = signal(SignalKind::quit()).expect("Failed to setup SIGQUIT channel");
tokio::select! {
_ = tokio::signal::ctrl_c() => {
log::info!("Received SIGINT");
},
_ = sigterm.recv() => {
log::info!("Received SIGTERM");
}
_ = sigquit.recv() => {
log::info!("Received SIGQUIT");
}
}
}
#[cfg(not(unix))]
async fn wait_for_signal() {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
log::info!("Received SIGINT");
},
}
}
+15
View File
@@ -630,6 +630,7 @@ dependencies = [
"serde",
"sled",
"tap",
"task",
"thiserror",
"tokio",
"topology",
@@ -1921,8 +1922,10 @@ dependencies = [
"pemstore",
"rand 0.7.3",
"secp256k1",
"task",
"thiserror",
"tokio",
"tokio-stream",
"tokio-tungstenite",
"tungstenite",
"url",
@@ -3461,6 +3464,7 @@ dependencies = [
"serde",
"snafu",
"socks5-requests",
"task",
"tokio",
"topology",
"url",
@@ -4268,6 +4272,7 @@ dependencies = [
"log",
"ordered-buffer",
"socks5-requests",
"task",
"tokio",
"tokio-util 0.7.3",
]
@@ -5626,6 +5631,15 @@ dependencies = [
"xattr",
]
[[package]]
name = "task"
version = "0.1.0"
dependencies = [
"log",
"tokio",
"tokio-util 0.7.3",
]
[[package]]
name = "tauri"
version = "1.0.3"
@@ -6072,6 +6086,7 @@ dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util 0.7.3",
]
[[package]]
@@ -34,4 +34,5 @@ ordered-buffer = {path = "../../common/socks5/ordered-buffer"}
proxy-helpers = { path = "../../common/socks5/proxy-helpers" }
socks5-requests = { path = "../../common/socks5/requests" }
statistics-common = { path = "../../common/statistics" }
task = { path = "../../common/task" }
websocket-requests = { path = "../../clients/native/websocket-requests" }
@@ -7,6 +7,7 @@ use proxy_helpers::connection_controller::ConnectionReceiver;
use proxy_helpers::proxy_runner::ProxyRunner;
use socks5_requests::{ConnectionId, Message as Socks5Message, RemoteAddress, Response};
use std::io;
use task::ShutdownListener;
use tokio::net::TcpStream;
/// A TCP connection between the Socks5 service provider, which makes
@@ -40,6 +41,7 @@ impl Connection {
&mut self,
mix_receiver: ConnectionReceiver,
mix_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
shutdown: ShutdownListener,
) {
let stream = self.conn.take().unwrap();
let remote_source_address = "???".to_string(); // we don't know ip address of requester
@@ -52,6 +54,7 @@ impl Connection {
mix_receiver,
mix_sender,
connection_id,
shutdown,
)
.run(move |conn_id, read_data, socket_closed| {
(
@@ -19,6 +19,7 @@ use socks5_requests::{
use statistics_common::collector::StatisticsSender;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use task::ShutdownListener;
use tokio_tungstenite::tungstenite::protocol::Message;
use websocket::WebsocketConnectionError;
use websocket_requests::{requests::ClientRequest, responses::ServerResponse};
@@ -134,6 +135,7 @@ impl ServiceProvider {
return_address: Recipient,
controller_sender: ControllerSender,
mix_input_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
shutdown: ShutdownListener,
) {
let mut conn = match Connection::new(conn_id, remote_addr.clone(), return_address).await {
Ok(conn) => conn,
@@ -170,7 +172,8 @@ impl ServiceProvider {
);
// run the proxy on the connection
conn.run_proxy(mix_receiver, mix_input_sender).await;
conn.run_proxy(mix_receiver, mix_input_sender, shutdown)
.await;
// proxy is done - remove the access channel from the controller
controller_sender
@@ -192,6 +195,7 @@ impl ServiceProvider {
conn_id: ConnectionId,
remote_addr: String,
return_address: Recipient,
shutdown: ShutdownListener,
) {
if !self.open_proxy && !self.outbound_request_filter.check(&remote_addr) {
let log_msg = format!("Domain {:?} failed filter check", remote_addr);
@@ -218,6 +222,7 @@ impl ServiceProvider {
return_address,
controller_sender_clone,
mix_input_sender_clone,
shutdown,
)
.await
});
@@ -241,6 +246,7 @@ impl ServiceProvider {
controller_sender: &mut ControllerSender,
mix_input_sender: &mpsc::UnboundedSender<(Socks5Message, Recipient)>,
stats_collector: Option<ServiceStatisticsCollector>,
shutdown: ShutdownListener,
) {
let deserialized_msg = match Socks5Message::try_from_bytes(raw_request) {
Ok(msg) => msg,
@@ -265,6 +271,7 @@ impl ServiceProvider {
req.conn_id,
req.remote_addr,
req.return_address,
shutdown,
)
}
@@ -302,7 +309,10 @@ impl ServiceProvider {
let (mix_input_sender, mix_input_receiver) =
mpsc::unbounded::<(Socks5Message, Recipient)>();
// controller for managing all active connections
// Controller for managing all active connections.
// We provide it with a ShutdownListener since it requires it, even though for the network
// requester shutdown signalling is not yet fully implemented.
let shutdown = task::ShutdownNotifier::default();
let (mut active_connections_controller, mut controller_sender) = Controller::new();
tokio::spawn(async move {
active_connections_controller.run().await;
@@ -353,6 +363,7 @@ impl ServiceProvider {
&mut controller_sender,
&mix_input_sender,
stats_collector.clone(),
shutdown.subscribe(),
)
.await;
}
@@ -209,6 +209,7 @@ impl PacketSender {
// currently we do not care about acks at all, but we must keep the channel alive
// so that the gateway client would not crash
let (ack_sender, ack_receiver) = mpsc::unbounded();
let mut gateway_client = GatewayClient::new(
address,
Arc::clone(&fresh_gateway_client_data.local_identity),
@@ -219,6 +220,7 @@ impl PacketSender {
ack_sender,
fresh_gateway_client_data.gateway_response_timeout,
Some(fresh_gateway_client_data.bandwidth_controller.clone()),
None,
);
gateway_client