Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 73955ecfa3 | |||
| 06ac21e21b | |||
| bb145a1cc3 | |||
| 5b0d5ac2ba | |||
| 9a0086b5e8 |
@@ -12,10 +12,12 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
|
||||
### Changed
|
||||
|
||||
- validator-client: made `fee` argument optional for `execute` and `execute_multiple` ([#1541])
|
||||
- socks5 client: graceful shutdown should fix error on disconnect in nym-connect ([#1591])
|
||||
|
||||
[#1541]: https://github.com/nymtech/nym/pull/1541
|
||||
[#1558]: https://github.com/nymtech/nym/pull/1558
|
||||
[#1577]: https://github.com/nymtech/nym/pull/1577
|
||||
[#1591]: https://github.com/nymtech/nym/pull/1591
|
||||
|
||||
|
||||
## [nym-binaries-1.0.2](https://github.com/nymtech/nym/tree/nym-binaries-1.0.2)
|
||||
|
||||
Generated
+8
@@ -668,6 +668,7 @@ dependencies = [
|
||||
"serde",
|
||||
"sled",
|
||||
"tap",
|
||||
"task",
|
||||
"tempfile",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
@@ -2054,8 +2055,10 @@ dependencies = [
|
||||
"pemstore",
|
||||
"rand 0.7.3",
|
||||
"secp256k1",
|
||||
"task",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-tungstenite 0.14.0",
|
||||
"tungstenite 0.13.0",
|
||||
"url",
|
||||
@@ -3255,6 +3258,7 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sled",
|
||||
"task",
|
||||
"tokio",
|
||||
"tokio-tungstenite 0.14.0",
|
||||
"topology",
|
||||
@@ -3375,6 +3379,7 @@ dependencies = [
|
||||
"socks5-requests",
|
||||
"sqlx 0.6.1",
|
||||
"statistics-common",
|
||||
"task",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-tungstenite 0.17.2",
|
||||
@@ -3424,6 +3429,7 @@ dependencies = [
|
||||
"serde",
|
||||
"snafu",
|
||||
"socks5-requests",
|
||||
"task",
|
||||
"tokio",
|
||||
"topology",
|
||||
"url",
|
||||
@@ -4258,6 +4264,7 @@ dependencies = [
|
||||
"log",
|
||||
"ordered-buffer",
|
||||
"socks5-requests",
|
||||
"task",
|
||||
"tokio",
|
||||
"tokio-test",
|
||||
"tokio-util 0.7.3",
|
||||
@@ -6138,6 +6145,7 @@ dependencies = [
|
||||
"futures-core",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"tokio-util 0.7.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -26,6 +26,7 @@ gateway-requests = { path = "../../gateway/gateway-requests" }
|
||||
nonexhaustive-delayqueue = { path = "../../common/nonexhaustive-delayqueue" }
|
||||
nymsphinx = { path = "../../common/nymsphinx" }
|
||||
pemstore = { path = "../../common/pemstore" }
|
||||
task = { path = "../../common/task" }
|
||||
topology = { path = "../../common/topology" }
|
||||
validator-client = { path = "../../common/client-libs/validator-client" }
|
||||
tap = "1.0.1"
|
||||
|
||||
@@ -13,6 +13,7 @@ use nymsphinx::utils::sample_poisson_duration;
|
||||
use rand::{rngs::OsRng, CryptoRng, Rng};
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use task::ShutdownListener;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time;
|
||||
|
||||
@@ -48,6 +49,9 @@ where
|
||||
|
||||
/// Accessor to the common instance of network topology.
|
||||
topology_access: TopologyAccessor,
|
||||
|
||||
/// Listen to shutdown signals.
|
||||
shutdown: ShutdownListener,
|
||||
}
|
||||
|
||||
impl<R> Stream for LoopCoverTrafficStream<R>
|
||||
@@ -84,6 +88,7 @@ where
|
||||
// obviously when we finally make shared rng that is on 'higher' level, this should become
|
||||
// generic `R`
|
||||
impl LoopCoverTrafficStream<OsRng> {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
ack_key: Arc<AckKey>,
|
||||
average_ack_delay: time::Duration,
|
||||
@@ -92,6 +97,7 @@ impl LoopCoverTrafficStream<OsRng> {
|
||||
mix_tx: BatchMixMessageSender,
|
||||
our_full_destination: Recipient,
|
||||
topology_access: TopologyAccessor,
|
||||
shutdown: ShutdownListener,
|
||||
) -> Self {
|
||||
let rng = OsRng;
|
||||
|
||||
@@ -105,6 +111,7 @@ impl LoopCoverTrafficStream<OsRng> {
|
||||
our_full_destination,
|
||||
rng,
|
||||
topology_access,
|
||||
shutdown,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -159,9 +166,21 @@ impl LoopCoverTrafficStream<OsRng> {
|
||||
self.average_cover_message_sending_delay,
|
||||
)));
|
||||
|
||||
while self.next().await.is_some() {
|
||||
self.on_new_message().await;
|
||||
let mut shutdown = self.shutdown.clone();
|
||||
while !shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = shutdown.recv() => {
|
||||
log::trace!("LoopCoverTrafficStream: Received shutdown");
|
||||
}
|
||||
next = self.next() => {
|
||||
if next.is_some() {
|
||||
self.on_new_message().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
log::debug!("LoopCoverTrafficStream: Exiting");
|
||||
}
|
||||
|
||||
pub fn start(mut self) -> JoinHandle<()> {
|
||||
|
||||
@@ -6,6 +6,7 @@ use futures::StreamExt;
|
||||
use gateway_client::GatewayClient;
|
||||
use log::*;
|
||||
use nymsphinx::forwarding::packet::MixPacket;
|
||||
use task::ShutdownListener;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
pub type BatchMixMessageSender = mpsc::UnboundedSender<Vec<MixPacket>>;
|
||||
@@ -18,6 +19,7 @@ pub struct MixTrafficController {
|
||||
// later on gateway_client will need to be accessible by other entities
|
||||
gateway_client: GatewayClient,
|
||||
mix_rx: BatchMixMessageReceiver,
|
||||
shutdown: ShutdownListener,
|
||||
|
||||
// TODO: this is temporary work-around.
|
||||
// in long run `gateway_client` will be moved away from `MixTrafficController` anyway.
|
||||
@@ -28,10 +30,12 @@ impl MixTrafficController {
|
||||
pub fn new(
|
||||
mix_rx: BatchMixMessageReceiver,
|
||||
gateway_client: GatewayClient,
|
||||
shutdown: ShutdownListener,
|
||||
) -> MixTrafficController {
|
||||
MixTrafficController {
|
||||
gateway_client,
|
||||
mix_rx,
|
||||
shutdown,
|
||||
consecutive_gateway_failure_count: 0,
|
||||
}
|
||||
}
|
||||
@@ -66,9 +70,23 @@ impl MixTrafficController {
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) {
|
||||
while let Some(mix_packets) = self.mix_rx.next().await {
|
||||
self.on_messages(mix_packets).await;
|
||||
while !self.shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
mix_packets = self.mix_rx.next() => match mix_packets {
|
||||
Some(mix_packets) => {
|
||||
self.on_messages(mix_packets).await;
|
||||
},
|
||||
None => {
|
||||
log::trace!("MixTrafficController: Stopping since channel closed");
|
||||
break;
|
||||
}
|
||||
},
|
||||
_ = self.shutdown.recv() => {
|
||||
log::trace!("MixTrafficController: Received shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
log::debug!("MixTrafficController: Exiting");
|
||||
}
|
||||
|
||||
pub fn start(mut self) -> JoinHandle<()> {
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use std::sync::atomic::AtomicBool;
|
||||
|
||||
pub mod cover_traffic_stream;
|
||||
pub mod inbound_messages;
|
||||
pub mod key_manager;
|
||||
@@ -6,3 +8,5 @@ pub mod real_messages_control;
|
||||
pub mod received_buffer;
|
||||
pub mod reply_key_storage;
|
||||
pub mod topology_control;
|
||||
|
||||
pub static SHUTDOWN_IS_SIGNALLED: AtomicBool = AtomicBool::new(false);
|
||||
|
||||
+16
-5
@@ -10,6 +10,7 @@ use nymsphinx::{
|
||||
chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID},
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use task::ShutdownListener;
|
||||
|
||||
/// Module responsible for listening for any data resembling acknowledgements from the network
|
||||
/// and firing actions to remove them from the 'Pending' state.
|
||||
@@ -17,6 +18,7 @@ pub(super) struct AcknowledgementListener {
|
||||
ack_key: Arc<AckKey>,
|
||||
ack_receiver: AcknowledgementReceiver,
|
||||
action_sender: ActionSender,
|
||||
shutdown: ShutdownListener,
|
||||
}
|
||||
|
||||
impl AcknowledgementListener {
|
||||
@@ -24,11 +26,13 @@ impl AcknowledgementListener {
|
||||
ack_key: Arc<AckKey>,
|
||||
ack_receiver: AcknowledgementReceiver,
|
||||
action_sender: ActionSender,
|
||||
shutdown: ShutdownListener,
|
||||
) -> Self {
|
||||
AcknowledgementListener {
|
||||
ack_key,
|
||||
ack_receiver,
|
||||
action_sender,
|
||||
shutdown,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,12 +69,19 @@ impl AcknowledgementListener {
|
||||
|
||||
pub(super) async fn run(&mut self) {
|
||||
debug!("Started AcknowledgementListener");
|
||||
while let Some(acks) = self.ack_receiver.next().await {
|
||||
// realistically we would only be getting one ack at the time
|
||||
for ack in acks {
|
||||
self.on_ack(ack).await;
|
||||
while !self.shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
Some(acks) = self.ack_receiver.next() => {
|
||||
// realistically we would only be getting one ack at the time
|
||||
for ack in acks {
|
||||
self.on_ack(ack).await;
|
||||
}
|
||||
},
|
||||
_ = self.shutdown.recv() => {
|
||||
log::trace!("AcknowledgementListener: Received shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
error!("TODO: error msg. Or maybe panic?")
|
||||
log::debug!("AcknowledgementListener: Exiting");
|
||||
}
|
||||
}
|
||||
|
||||
+13
-3
@@ -12,6 +12,7 @@ use nymsphinx::Delay as SphinxDelay;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use task::ShutdownListener;
|
||||
|
||||
pub(crate) type ActionSender = UnboundedSender<Action>;
|
||||
|
||||
@@ -99,12 +100,16 @@ pub(super) struct ActionController {
|
||||
|
||||
/// Channel for notifying `RetransmissionRequestListener` about expired acknowledgements.
|
||||
retransmission_sender: RetransmissionRequestSender,
|
||||
|
||||
/// Listen for shutdown notifications
|
||||
shutdown: ShutdownListener,
|
||||
}
|
||||
|
||||
impl ActionController {
|
||||
pub(super) fn new(
|
||||
config: Config,
|
||||
retransmission_sender: RetransmissionRequestSender,
|
||||
shutdown: ShutdownListener,
|
||||
) -> (Self, ActionSender) {
|
||||
let (sender, receiver) = mpsc::unbounded();
|
||||
(
|
||||
@@ -114,6 +119,7 @@ impl ActionController {
|
||||
pending_acks_timers: NonExhaustiveDelayQueue::new(),
|
||||
incoming_actions: receiver,
|
||||
retransmission_sender,
|
||||
shutdown,
|
||||
},
|
||||
sender,
|
||||
)
|
||||
@@ -246,14 +252,18 @@ impl ActionController {
|
||||
}
|
||||
|
||||
pub(super) async fn run(&mut self) {
|
||||
loop {
|
||||
// at some point there will be a global shutdown signal here as the third option
|
||||
while !self.shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
// we NEVER expect for ANY sender to get dropped so unwrap here is fine
|
||||
action = self.incoming_actions.next() => self.process_action(action.unwrap()),
|
||||
// pending ack queue Stream CANNOT return a `None` so unwrap here is fine
|
||||
expired_ack = self.pending_acks_timers.next() => self.handle_expired_ack_timer(expired_ack.unwrap())
|
||||
expired_ack = self.pending_acks_timers.next() => self.handle_expired_ack_timer(expired_ack.unwrap()),
|
||||
// listen for shutdown notifications
|
||||
_ = self.shutdown.recv() => {
|
||||
log::trace!("ActionController: Received shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
log::debug!("ActionController: Exiting");
|
||||
}
|
||||
}
|
||||
|
||||
+14
-3
@@ -16,6 +16,7 @@ use nymsphinx::preparer::MessagePreparer;
|
||||
use nymsphinx::{acknowledgements::AckKey, addressing::clients::Recipient};
|
||||
use rand::{CryptoRng, Rng};
|
||||
use std::sync::Arc;
|
||||
use task::ShutdownListener;
|
||||
|
||||
/// Module responsible for dealing with the received messages: splitting them, creating acknowledgements,
|
||||
/// putting everything into sphinx packets, etc.
|
||||
@@ -32,6 +33,7 @@ where
|
||||
real_message_sender: BatchRealMessageSender,
|
||||
topology_access: TopologyAccessor,
|
||||
reply_key_storage: ReplyKeyStorage,
|
||||
shutdown: ShutdownListener,
|
||||
}
|
||||
|
||||
impl<R> InputMessageListener<R>
|
||||
@@ -50,6 +52,7 @@ where
|
||||
real_message_sender: BatchRealMessageSender,
|
||||
topology_access: TopologyAccessor,
|
||||
reply_key_storage: ReplyKeyStorage,
|
||||
shutdown: ShutdownListener,
|
||||
) -> Self {
|
||||
InputMessageListener {
|
||||
ack_key,
|
||||
@@ -60,6 +63,7 @@ where
|
||||
real_message_sender,
|
||||
topology_access,
|
||||
reply_key_storage,
|
||||
shutdown,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -182,9 +186,16 @@ where
|
||||
|
||||
pub(super) async fn run(&mut self) {
|
||||
debug!("Started InputMessageListener");
|
||||
while let Some(input_msg) = self.input_receiver.next().await {
|
||||
self.on_input_message(input_msg).await;
|
||||
while !self.shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
Some(input_msg) = self.input_receiver.next() => {
|
||||
self.on_input_message(input_msg).await;
|
||||
},
|
||||
_ = self.shutdown.recv() => {
|
||||
log::trace!("InputMessageListener: Received shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
error!("TODO: error msg. Or maybe panic?")
|
||||
log::debug!("InputMessageListener: Exiting");
|
||||
}
|
||||
}
|
||||
|
||||
+13
-7
@@ -25,6 +25,7 @@ use std::{
|
||||
sync::{Arc, Weak},
|
||||
time::Duration,
|
||||
};
|
||||
use task::ShutdownListener;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
mod acknowledgement_listener;
|
||||
@@ -152,6 +153,7 @@ impl<R> AcknowledgementController<R>
|
||||
where
|
||||
R: 'static + CryptoRng + Rng + Clone + Send,
|
||||
{
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(super) fn new(
|
||||
config: Config,
|
||||
rng: R,
|
||||
@@ -160,13 +162,14 @@ where
|
||||
ack_recipient: Recipient,
|
||||
reply_key_storage: ReplyKeyStorage,
|
||||
connectors: AcknowledgementControllerConnectors,
|
||||
shutdown: ShutdownListener,
|
||||
) -> Self {
|
||||
let (retransmission_tx, retransmission_rx) = mpsc::unbounded();
|
||||
|
||||
let action_config =
|
||||
action_controller::Config::new(config.ack_wait_addition, config.ack_wait_multiplier);
|
||||
let (action_controller, action_sender) =
|
||||
ActionController::new(action_config, retransmission_tx);
|
||||
ActionController::new(action_config, retransmission_tx, shutdown.clone());
|
||||
|
||||
let message_preparer = MessagePreparer::new(
|
||||
rng,
|
||||
@@ -180,6 +183,7 @@ where
|
||||
Arc::clone(&ack_key),
|
||||
connectors.ack_receiver,
|
||||
action_sender.clone(),
|
||||
shutdown.clone(),
|
||||
);
|
||||
|
||||
// will listen for any new messages from the client
|
||||
@@ -192,6 +196,7 @@ where
|
||||
connectors.real_message_sender.clone(),
|
||||
topology_access.clone(),
|
||||
reply_key_storage,
|
||||
shutdown.clone(),
|
||||
);
|
||||
|
||||
// will listen for any ack timeouts and trigger retransmission
|
||||
@@ -203,12 +208,13 @@ where
|
||||
connectors.real_message_sender,
|
||||
retransmission_rx,
|
||||
topology_access,
|
||||
shutdown.clone(),
|
||||
);
|
||||
|
||||
// will listen for events indicating the packet was sent through the network so that
|
||||
// the retransmission timer should be started.
|
||||
let sent_notification_listener =
|
||||
SentNotificationListener::new(connectors.sent_notifier, action_sender);
|
||||
SentNotificationListener::new(connectors.sent_notifier, action_sender, shutdown);
|
||||
|
||||
AcknowledgementController {
|
||||
acknowledgement_listener: Some(acknowledgement_listener),
|
||||
@@ -232,27 +238,27 @@ where
|
||||
// graceful shutdowns.
|
||||
let ack_listener_fut = tokio::spawn(async move {
|
||||
acknowledgement_listener.run().await;
|
||||
error!("The acknowledgement listener has finished execution!");
|
||||
debug!("The acknowledgement listener has finished execution!");
|
||||
acknowledgement_listener
|
||||
});
|
||||
let input_listener_fut = tokio::spawn(async move {
|
||||
input_message_listener.run().await;
|
||||
error!("The input listener has finished execution!");
|
||||
debug!("The input listener has finished execution!");
|
||||
input_message_listener
|
||||
});
|
||||
let retransmission_req_fut = tokio::spawn(async move {
|
||||
retransmission_request_listener.run().await;
|
||||
error!("The retransmission request listener has finished execution!");
|
||||
debug!("The retransmission request listener has finished execution!");
|
||||
retransmission_request_listener
|
||||
});
|
||||
let sent_notification_fut = tokio::spawn(async move {
|
||||
sent_notification_listener.run().await;
|
||||
error!("The sent notification listener has finished execution!");
|
||||
debug!("The sent notification listener has finished execution!");
|
||||
sent_notification_listener
|
||||
});
|
||||
let action_controller_fut = tokio::spawn(async move {
|
||||
action_controller.run().await;
|
||||
error!("The controller has finished execution!");
|
||||
debug!("The controller has finished execution!");
|
||||
action_controller
|
||||
});
|
||||
|
||||
|
||||
+16
-3
@@ -14,6 +14,7 @@ use nymsphinx::preparer::MessagePreparer;
|
||||
use nymsphinx::{acknowledgements::AckKey, addressing::clients::Recipient};
|
||||
use rand::{CryptoRng, Rng};
|
||||
use std::sync::{Arc, Weak};
|
||||
use task::ShutdownListener;
|
||||
|
||||
// responsible for packet retransmission upon fired timer
|
||||
pub(super) struct RetransmissionRequestListener<R>
|
||||
@@ -27,12 +28,14 @@ where
|
||||
real_message_sender: BatchRealMessageSender,
|
||||
request_receiver: RetransmissionRequestReceiver,
|
||||
topology_access: TopologyAccessor,
|
||||
shutdown: ShutdownListener,
|
||||
}
|
||||
|
||||
impl<R> RetransmissionRequestListener<R>
|
||||
where
|
||||
R: CryptoRng + Rng,
|
||||
{
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(super) fn new(
|
||||
ack_key: Arc<AckKey>,
|
||||
ack_recipient: Recipient,
|
||||
@@ -41,6 +44,7 @@ where
|
||||
real_message_sender: BatchRealMessageSender,
|
||||
request_receiver: RetransmissionRequestReceiver,
|
||||
topology_access: TopologyAccessor,
|
||||
shutdown: ShutdownListener,
|
||||
) -> Self {
|
||||
RetransmissionRequestListener {
|
||||
ack_key,
|
||||
@@ -50,6 +54,7 @@ where
|
||||
real_message_sender,
|
||||
request_receiver,
|
||||
topology_access,
|
||||
shutdown,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -121,9 +126,17 @@ where
|
||||
|
||||
pub(super) async fn run(&mut self) {
|
||||
debug!("Started RetransmissionRequestListener");
|
||||
while let Some(timed_out_ack) = self.request_receiver.next().await {
|
||||
self.on_retransmission_request(timed_out_ack).await;
|
||||
|
||||
while !self.shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
Some(timed_out_ack) = self.request_receiver.next() => {
|
||||
self.on_retransmission_request(timed_out_ack).await;
|
||||
},
|
||||
_ = self.shutdown.recv() => {
|
||||
log::trace!("RetransmissionRequestListener: Received shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
error!("TODO: error msg. Or maybe panic?")
|
||||
log::debug!("RetransmissionRequestListener: Exiting");
|
||||
}
|
||||
}
|
||||
|
||||
+14
-3
@@ -6,6 +6,7 @@ use super::SentPacketNotificationReceiver;
|
||||
use futures::StreamExt;
|
||||
use log::*;
|
||||
use nymsphinx::chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID};
|
||||
use task::ShutdownListener;
|
||||
|
||||
/// Module responsible for starting up retransmission timers.
|
||||
/// It is required because when we send our packet to the `real traffic stream` controlled
|
||||
@@ -14,16 +15,19 @@ use nymsphinx::chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID};
|
||||
pub(super) struct SentNotificationListener {
|
||||
sent_notifier: SentPacketNotificationReceiver,
|
||||
action_sender: ActionSender,
|
||||
shutdown: ShutdownListener,
|
||||
}
|
||||
|
||||
impl SentNotificationListener {
|
||||
pub(super) fn new(
|
||||
sent_notifier: SentPacketNotificationReceiver,
|
||||
action_sender: ActionSender,
|
||||
shutdown: ShutdownListener,
|
||||
) -> Self {
|
||||
SentNotificationListener {
|
||||
sent_notifier,
|
||||
action_sender,
|
||||
shutdown,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,9 +48,16 @@ impl SentNotificationListener {
|
||||
|
||||
pub(super) async fn run(&mut self) {
|
||||
debug!("Started SentNotificationListener");
|
||||
while let Some(frag_id) = self.sent_notifier.next().await {
|
||||
self.on_sent_message(frag_id).await;
|
||||
while !self.shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
Some(frag_id) = self.sent_notifier.next() => {
|
||||
self.on_sent_message(frag_id).await;
|
||||
},
|
||||
_ = self.shutdown.recv() => {
|
||||
log::trace!("SentNotificationListener: Received shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
error!("TODO: error msg. Or maybe panic?")
|
||||
log::debug!("SentNotificationListener: Exiting");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ use nymsphinx::addressing::clients::Recipient;
|
||||
use rand::{rngs::OsRng, CryptoRng, Rng};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use task::ShutdownListener;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
mod acknowledgement_control;
|
||||
@@ -91,6 +92,7 @@ impl RealMessagesController<OsRng> {
|
||||
mix_sender: BatchMixMessageSender,
|
||||
topology_access: TopologyAccessor,
|
||||
reply_key_storage: ReplyKeyStorage,
|
||||
shutdown: ShutdownListener,
|
||||
) -> Self {
|
||||
let rng = OsRng;
|
||||
|
||||
@@ -119,6 +121,7 @@ impl RealMessagesController<OsRng> {
|
||||
config.self_recipient,
|
||||
reply_key_storage,
|
||||
ack_controller_connectors,
|
||||
shutdown.clone(),
|
||||
);
|
||||
|
||||
let out_queue_config = real_traffic_stream::Config::new(
|
||||
@@ -136,6 +139,7 @@ impl RealMessagesController<OsRng> {
|
||||
rng,
|
||||
config.self_recipient,
|
||||
topology_access,
|
||||
shutdown,
|
||||
);
|
||||
|
||||
RealMessagesController {
|
||||
@@ -153,12 +157,12 @@ impl RealMessagesController<OsRng> {
|
||||
// graceful shutdowns.
|
||||
let out_queue_control_fut = tokio::spawn(async move {
|
||||
out_queue_control.run_out_queue_control().await;
|
||||
error!("The out queue controller has finished execution!");
|
||||
debug!("The out queue controller has finished execution!");
|
||||
out_queue_control
|
||||
});
|
||||
let ack_control_fut = tokio::spawn(async move {
|
||||
ack_control.run().await;
|
||||
error!("The acknowledgement controller has finished execution!");
|
||||
debug!("The acknowledgement controller has finished execution!");
|
||||
ack_control
|
||||
});
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ use std::collections::VecDeque;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use task::ShutdownListener;
|
||||
use tokio::time;
|
||||
|
||||
/// Configurable parameters of the `OutQueueControl`
|
||||
@@ -83,6 +84,9 @@ where
|
||||
|
||||
/// Buffer containing all real messages received. It is first exhausted before more are pulled.
|
||||
received_buffer: VecDeque<RealMessage>,
|
||||
|
||||
/// Listens for shutdown signals
|
||||
shutdown: ShutdownListener,
|
||||
}
|
||||
|
||||
pub(crate) struct RealMessage {
|
||||
@@ -174,6 +178,7 @@ where
|
||||
rng: R,
|
||||
our_full_destination: Recipient,
|
||||
topology_access: TopologyAccessor,
|
||||
shutdown: ShutdownListener,
|
||||
) -> Self {
|
||||
OutQueueControl {
|
||||
config,
|
||||
@@ -186,6 +191,7 @@ where
|
||||
rng,
|
||||
topology_access,
|
||||
received_buffer: VecDeque::with_capacity(0), // we won't be putting any data into this guy directly
|
||||
shutdown,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -239,7 +245,15 @@ where
|
||||
// - we run out of memory
|
||||
// - the receiver channel is closed
|
||||
// in either case there's no recovery and we can only panic
|
||||
self.mix_tx.unbounded_send(vec![next_message]).unwrap();
|
||||
if let Err(err) = self.mix_tx.unbounded_send(vec![next_message]) {
|
||||
if self.shutdown.is_shutdown_poll() {
|
||||
log::info!("Failed to send (shutdown detected)");
|
||||
} else {
|
||||
// We don't try to limp along, panic to avoid continuing in a potentially
|
||||
// inconsistent state
|
||||
panic!("{err}");
|
||||
}
|
||||
}
|
||||
|
||||
// JS: Not entirely sure why or how it fixes stuff, but without the yield call,
|
||||
// the UnboundedReceiver [of mix_rx] will not get a chance to read anything
|
||||
@@ -257,9 +271,19 @@ where
|
||||
self.config.average_message_sending_delay,
|
||||
)));
|
||||
|
||||
while let Some(next_message) = self.next().await {
|
||||
self.on_message(next_message).await;
|
||||
let mut shutdown = self.shutdown.clone();
|
||||
while !shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = shutdown.recv() => {
|
||||
log::trace!("OutQueueControl: Received shutdown");
|
||||
}
|
||||
Some(next_message) = self.next() => {
|
||||
self.on_message(next_message).await;
|
||||
},
|
||||
}
|
||||
}
|
||||
log::debug!("OutQueueControl: Exiting");
|
||||
}
|
||||
|
||||
pub(crate) async fn run_out_queue_control(&mut self) {
|
||||
|
||||
@@ -15,6 +15,7 @@ use nymsphinx::params::{ReplySurbEncryptionAlgorithm, ReplySurbKeyDigestAlgorith
|
||||
use nymsphinx::receiver::{MessageReceiver, MessageRecoveryError, ReconstructedMessage};
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use task::ShutdownListener;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
// Buffer Requests to say "hey, send any reconstructed messages to this channel"
|
||||
@@ -292,16 +293,26 @@ impl RequestReceiver {
|
||||
|
||||
fn start(mut self) -> JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
while let Some(request) = self.query_receiver.next().await {
|
||||
match request {
|
||||
ReceivedBufferMessage::ReceiverAnnounce(sender) => {
|
||||
self.received_buffer.connect_sender(sender).await;
|
||||
}
|
||||
ReceivedBufferMessage::ReceiverDisconnect => {
|
||||
self.received_buffer.disconnect_sender().await
|
||||
}
|
||||
}
|
||||
loop {
|
||||
tokio::select! {
|
||||
request = self.query_receiver.next() => {
|
||||
match request {
|
||||
Some(ReceivedBufferMessage::ReceiverAnnounce(sender)) => {
|
||||
self.received_buffer.connect_sender(sender).await;
|
||||
}
|
||||
Some(ReceivedBufferMessage::ReceiverDisconnect) => {
|
||||
self.received_buffer.disconnect_sender().await
|
||||
}
|
||||
None => {
|
||||
log::trace!("RequestReceiver: Stopping since channel closed");
|
||||
break;
|
||||
},
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
log::debug!("RequestReceiver: Exiting");
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -309,23 +320,40 @@ impl RequestReceiver {
|
||||
struct FragmentedMessageReceiver {
|
||||
received_buffer: ReceivedMessagesBuffer,
|
||||
mixnet_packet_receiver: MixnetMessageReceiver,
|
||||
shutdown: ShutdownListener,
|
||||
}
|
||||
|
||||
impl FragmentedMessageReceiver {
|
||||
fn new(
|
||||
received_buffer: ReceivedMessagesBuffer,
|
||||
mixnet_packet_receiver: MixnetMessageReceiver,
|
||||
shutdown: ShutdownListener,
|
||||
) -> Self {
|
||||
FragmentedMessageReceiver {
|
||||
received_buffer,
|
||||
mixnet_packet_receiver,
|
||||
shutdown,
|
||||
}
|
||||
}
|
||||
fn start(mut self) -> JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
while let Some(new_messages) = self.mixnet_packet_receiver.next().await {
|
||||
self.received_buffer.handle_new_received(new_messages).await;
|
||||
while !self.shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
new_messages = self.mixnet_packet_receiver.next() => match new_messages {
|
||||
Some(new_messages) => {
|
||||
self.received_buffer.handle_new_received(new_messages).await;
|
||||
}
|
||||
None => {
|
||||
log::trace!("FragmentedMessageReceiver: Stopping since channel closed");
|
||||
break;
|
||||
}
|
||||
},
|
||||
_ = self.shutdown.recv() => {
|
||||
log::trace!("FragmentedMessageReceiver: Received shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
log::debug!("FragmentedMessageReceiver: Exiting");
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -341,6 +369,7 @@ impl ReceivedMessagesBufferController {
|
||||
query_receiver: ReceivedBufferRequestReceiver,
|
||||
mixnet_packet_receiver: MixnetMessageReceiver,
|
||||
reply_key_storage: ReplyKeyStorage,
|
||||
shutdown: ShutdownListener,
|
||||
) -> Self {
|
||||
let received_buffer =
|
||||
ReceivedMessagesBuffer::new(local_encryption_keypair, reply_key_storage);
|
||||
@@ -349,6 +378,7 @@ impl ReceivedMessagesBufferController {
|
||||
fragmented_message_receiver: FragmentedMessageReceiver::new(
|
||||
received_buffer.clone(),
|
||||
mixnet_packet_receiver,
|
||||
shutdown,
|
||||
),
|
||||
request_receiver: RequestReceiver::new(received_buffer, query_receiver),
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
use std::time;
|
||||
use std::time::Duration;
|
||||
use task::ShutdownListener;
|
||||
use tokio::sync::{RwLock, RwLockReadGuard};
|
||||
use tokio::task::JoinHandle;
|
||||
use topology::{nym_topology_from_bonds, NymTopology};
|
||||
@@ -303,12 +304,21 @@ impl TopologyRefresher {
|
||||
self.topology_accessor.is_routable().await
|
||||
}
|
||||
|
||||
pub fn start(mut self) -> JoinHandle<()> {
|
||||
pub fn start(mut self, mut shutdown: ShutdownListener) -> JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::time::sleep(self.refresh_rate).await;
|
||||
self.refresh().await;
|
||||
while !shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(self.refresh_rate) => {
|
||||
self.refresh().await;
|
||||
},
|
||||
_ = shutdown.recv() => {
|
||||
log::trace!("TopologyRefresher: Received shutdown");
|
||||
},
|
||||
}
|
||||
}
|
||||
let a = SHUTDOWN_IS_SIGNALED;
|
||||
let a = crate::client::SHUTDOWN_IS_SIGNALLED.load(std::sync::atomic::Ordering::Relaxed);
|
||||
log::debug!("TopologyRefresher: Exiting");
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,19 +33,20 @@ tokio-tungstenite = "0.14" # websocket
|
||||
## internal
|
||||
client-core = { path = "../client-core" }
|
||||
coconut-interface = { path = "../../common/coconut-interface", optional = true }
|
||||
credentials = { path = "../../common/credentials", optional = true }
|
||||
credential-storage = { path = "../../common/credential-storage" }
|
||||
config = { path = "../../common/config" }
|
||||
credential-storage = { path = "../../common/credential-storage" }
|
||||
credentials = { path = "../../common/credentials", optional = true }
|
||||
crypto = { path = "../../common/crypto" }
|
||||
gateway-client = { path = "../../common/client-libs/gateway-client" }
|
||||
gateway-requests = { path = "../../gateway/gateway-requests" }
|
||||
network-defaults = { path = "../../common/network-defaults" }
|
||||
nymsphinx = { path = "../../common/nymsphinx" }
|
||||
pemstore = { path = "../../common/pemstore" }
|
||||
task = { path = "../../common/task" }
|
||||
topology = { path = "../../common/topology" }
|
||||
websocket-requests = { path = "websocket-requests" }
|
||||
validator-client = { path = "../../common/client-libs/validator-client", features = ["nymd-client"] }
|
||||
version-checker = { path = "../../common/version-checker" }
|
||||
network-defaults = { path = "../../common/network-defaults" }
|
||||
websocket-requests = { path = "websocket-requests" }
|
||||
|
||||
[features]
|
||||
coconut = ["coconut-interface", "credentials", "credentials/coconut", "gateway-requests/coconut", "gateway-client/coconut", "client-core/coconut"]
|
||||
|
||||
@@ -32,6 +32,7 @@ use nymsphinx::addressing::clients::Recipient;
|
||||
use nymsphinx::addressing::nodes::NodeIdentity;
|
||||
use nymsphinx::anonymous_replies::ReplySurb;
|
||||
use nymsphinx::receiver::ReconstructedMessage;
|
||||
use task::{wait_for_signal, ShutdownListener, ShutdownNotifier};
|
||||
|
||||
use crate::client::config::{Config, SocketType};
|
||||
use crate::websocket;
|
||||
@@ -85,6 +86,7 @@ impl NymClient {
|
||||
&self,
|
||||
topology_accessor: TopologyAccessor,
|
||||
mix_tx: BatchMixMessageSender,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
info!("Starting loop cover traffic stream...");
|
||||
|
||||
@@ -98,6 +100,7 @@ impl NymClient {
|
||||
mix_tx,
|
||||
self.as_mix_recipient(),
|
||||
topology_accessor,
|
||||
shutdown,
|
||||
)
|
||||
.start();
|
||||
}
|
||||
@@ -109,6 +112,7 @@ impl NymClient {
|
||||
ack_receiver: AcknowledgementReceiver,
|
||||
input_receiver: InputMessageReceiver,
|
||||
mix_sender: BatchMixMessageSender,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
let controller_config = real_messages_control::Config::new(
|
||||
self.key_manager.ack_key(),
|
||||
@@ -129,6 +133,7 @@ impl NymClient {
|
||||
mix_sender,
|
||||
topology_accessor,
|
||||
reply_key_storage,
|
||||
shutdown,
|
||||
)
|
||||
.start();
|
||||
}
|
||||
@@ -140,6 +145,7 @@ impl NymClient {
|
||||
query_receiver: ReceivedBufferRequestReceiver,
|
||||
mixnet_receiver: MixnetMessageReceiver,
|
||||
reply_key_storage: ReplyKeyStorage,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
info!("Starting received messages buffer controller...");
|
||||
ReceivedMessagesBufferController::new(
|
||||
@@ -147,6 +153,7 @@ impl NymClient {
|
||||
query_receiver,
|
||||
mixnet_receiver,
|
||||
reply_key_storage,
|
||||
shutdown,
|
||||
)
|
||||
.start()
|
||||
}
|
||||
@@ -155,6 +162,7 @@ impl NymClient {
|
||||
&mut self,
|
||||
mixnet_message_sender: MixnetMessageSender,
|
||||
ack_sender: AcknowledgementSender,
|
||||
shutdown: ShutdownListener,
|
||||
) -> GatewayClient {
|
||||
let gateway_id = self.config.get_base().get_gateway_id();
|
||||
if gateway_id.is_empty() {
|
||||
@@ -197,6 +205,7 @@ impl NymClient {
|
||||
ack_sender,
|
||||
self.config.get_base().get_gateway_response_timeout(),
|
||||
Some(bandwidth_controller),
|
||||
Some(shutdown),
|
||||
);
|
||||
|
||||
gateway_client
|
||||
@@ -212,7 +221,11 @@ impl NymClient {
|
||||
|
||||
// future responsible for periodically polling directory server and updating
|
||||
// the current global view of topology
|
||||
async fn start_topology_refresher(&mut self, topology_accessor: TopologyAccessor) {
|
||||
async fn start_topology_refresher(
|
||||
&mut self,
|
||||
topology_accessor: TopologyAccessor,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
let topology_refresher_config = TopologyRefresherConfig::new(
|
||||
self.config.get_base().get_validator_api_endpoints(),
|
||||
self.config.get_base().get_topology_refresh_rate(),
|
||||
@@ -234,7 +247,7 @@ impl NymClient {
|
||||
}
|
||||
|
||||
info!("Starting topology refresher...");
|
||||
topology_refresher.start();
|
||||
topology_refresher.start(shutdown);
|
||||
}
|
||||
|
||||
// controller for sending sphinx packets to mixnet (either real traffic or cover traffic)
|
||||
@@ -245,9 +258,10 @@ impl NymClient {
|
||||
&mut self,
|
||||
mix_rx: BatchMixMessageReceiver,
|
||||
gateway_client: GatewayClient,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
info!("Starting mix traffic controller...");
|
||||
MixTrafficController::new(mix_rx, gateway_client).start();
|
||||
MixTrafficController::new(mix_rx, gateway_client, shutdown).start();
|
||||
}
|
||||
|
||||
fn start_websocket_listener(
|
||||
@@ -308,20 +322,26 @@ impl NymClient {
|
||||
|
||||
/// blocking version of `start` method. Will run forever (or until SIGINT is sent)
|
||||
pub async fn run_forever(&mut self) {
|
||||
self.start().await;
|
||||
if let Err(e) = tokio::signal::ctrl_c().await {
|
||||
error!(
|
||||
"There was an error while capturing SIGINT - {:?}. We will terminate regardless",
|
||||
e
|
||||
);
|
||||
}
|
||||
let shutdown = self.start().await;
|
||||
wait_for_signal().await;
|
||||
|
||||
println!(
|
||||
"Received SIGINT - the client will terminate now (threads are not yet nicely stopped, if you see stack traces that's alright)."
|
||||
"Received signal - the client will terminate now (threads are not yet nicely stopped, if you see stack traces that's alright)."
|
||||
);
|
||||
|
||||
log::info!("Sending shutdown");
|
||||
shutdown.signal_shutdown().ok();
|
||||
|
||||
// Some of these components have shutdown signalling implemented as part of socks5 work,
|
||||
// but since it's not fully implemented (yet) for all the components of the native client,
|
||||
// we don't try to wait and instead just stop immediately.
|
||||
//log::info!("Waiting for tasks to finish... (Press ctrl-c to force)");
|
||||
//shutdown.wait_for_shutdown().await;
|
||||
|
||||
log::info!("Stopping nym-client");
|
||||
}
|
||||
|
||||
pub async fn start(&mut self) {
|
||||
pub async fn start(&mut self) -> ShutdownNotifier {
|
||||
info!("Starting nym client");
|
||||
// channels for inter-component communication
|
||||
// TODO: make the channels be internally created by the relevant components
|
||||
@@ -351,30 +371,43 @@ impl NymClient {
|
||||
ReplyKeyStorage::load(self.config.get_base().get_reply_encryption_key_store_path())
|
||||
.expect("Failed to load reply key storage!");
|
||||
|
||||
// Shutdown notifier for signalling tasks to stop
|
||||
let shutdown = ShutdownNotifier::default();
|
||||
|
||||
// the components are started in very specific order. Unless you know what you are doing,
|
||||
// do not change that.
|
||||
self.start_topology_refresher(shared_topology_accessor.clone())
|
||||
self.start_topology_refresher(shared_topology_accessor.clone(), shutdown.subscribe())
|
||||
.await;
|
||||
self.start_received_messages_buffer_controller(
|
||||
received_buffer_request_receiver,
|
||||
mixnet_messages_receiver,
|
||||
reply_key_storage.clone(),
|
||||
shutdown.subscribe(),
|
||||
);
|
||||
|
||||
let gateway_client = self
|
||||
.start_gateway_client(mixnet_messages_sender, ack_sender)
|
||||
.start_gateway_client(mixnet_messages_sender, ack_sender, shutdown.subscribe())
|
||||
.await;
|
||||
|
||||
self.start_mix_traffic_controller(sphinx_message_receiver, gateway_client);
|
||||
self.start_mix_traffic_controller(
|
||||
sphinx_message_receiver,
|
||||
gateway_client,
|
||||
shutdown.subscribe(),
|
||||
);
|
||||
self.start_real_traffic_controller(
|
||||
shared_topology_accessor.clone(),
|
||||
reply_key_storage,
|
||||
ack_receiver,
|
||||
input_receiver,
|
||||
sphinx_message_sender.clone(),
|
||||
shutdown.subscribe(),
|
||||
);
|
||||
|
||||
self.start_cover_traffic_stream(shared_topology_accessor, sphinx_message_sender);
|
||||
self.start_cover_traffic_stream(
|
||||
shared_topology_accessor,
|
||||
sphinx_message_sender,
|
||||
shutdown.subscribe(),
|
||||
);
|
||||
|
||||
match self.config.get_socket_type() {
|
||||
SocketType::WebSocket => {
|
||||
@@ -399,5 +432,7 @@ impl NymClient {
|
||||
|
||||
info!("Client startup finished!");
|
||||
info!("The address of this client is: {}", self.as_mix_recipient());
|
||||
|
||||
shutdown
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,21 +26,22 @@ url = "2.2"
|
||||
# internal
|
||||
client-core = { path = "../client-core" }
|
||||
coconut-interface = { path = "../../common/coconut-interface", optional = true }
|
||||
credentials = { path = "../../common/credentials", optional = true }
|
||||
credential-storage = { path = "../../common/credential-storage" }
|
||||
config = { path = "../../common/config" }
|
||||
credential-storage = { path = "../../common/credential-storage" }
|
||||
credentials = { path = "../../common/credentials", optional = true }
|
||||
crypto = { path = "../../common/crypto" }
|
||||
gateway-client = { path = "../../common/client-libs/gateway-client" }
|
||||
gateway-requests = { path = "../../gateway/gateway-requests" }
|
||||
network-defaults = { path = "../../common/network-defaults" }
|
||||
nymsphinx = { path = "../../common/nymsphinx" }
|
||||
ordered-buffer = { path = "../../common/socks5/ordered-buffer" }
|
||||
socks5-requests = { path = "../../common/socks5/requests" }
|
||||
topology = { path = "../../common/topology" }
|
||||
pemstore = { path = "../../common/pemstore" }
|
||||
proxy-helpers = { path = "../../common/socks5/proxy-helpers" }
|
||||
socks5-requests = { path = "../../common/socks5/requests" }
|
||||
task = { path = "../../common/task" }
|
||||
topology = { path = "../../common/topology" }
|
||||
validator-client = { path = "../../common/client-libs/validator-client", features = ["nymd-client"] }
|
||||
version-checker = { path = "../../common/version-checker" }
|
||||
network-defaults = { path = "../../common/network-defaults" }
|
||||
|
||||
[features]
|
||||
coconut = ["coconut-interface", "credentials", "gateway-requests/coconut", "gateway-client/coconut", "credentials/coconut", "client-core/coconut"]
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use std::sync::atomic::AtomicBool;
|
||||
|
||||
use client_core::client::cover_traffic_stream::LoopCoverTrafficStream;
|
||||
use client_core::client::inbound_messages::{
|
||||
InputMessage, InputMessageReceiver, InputMessageSender,
|
||||
@@ -29,6 +31,7 @@ use gateway_client::{
|
||||
use log::*;
|
||||
use nymsphinx::addressing::clients::Recipient;
|
||||
use nymsphinx::addressing::nodes::NodeIdentity;
|
||||
use task::{wait_for_signal, ShutdownListener, ShutdownNotifier};
|
||||
|
||||
use crate::client::config::Config;
|
||||
use crate::socks::{
|
||||
@@ -38,6 +41,8 @@ use crate::socks::{
|
||||
|
||||
pub mod config;
|
||||
|
||||
pub static SHUTDOWN_IS_SIGNALLED: AtomicBool = AtomicBool::new(false);
|
||||
|
||||
// Channels used to control the main task from outside
|
||||
pub type Socks5ControlMessageSender = mpsc::UnboundedSender<Socks5ControlMessage>;
|
||||
pub type Socks5ControlMessageReceiver = mpsc::UnboundedReceiver<Socks5ControlMessage>;
|
||||
@@ -84,6 +89,7 @@ impl NymClient {
|
||||
&self,
|
||||
topology_accessor: TopologyAccessor,
|
||||
mix_tx: BatchMixMessageSender,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
info!("Starting loop cover traffic stream...");
|
||||
|
||||
@@ -97,6 +103,7 @@ impl NymClient {
|
||||
mix_tx,
|
||||
self.as_mix_recipient(),
|
||||
topology_accessor,
|
||||
shutdown,
|
||||
)
|
||||
.start();
|
||||
}
|
||||
@@ -108,6 +115,7 @@ impl NymClient {
|
||||
ack_receiver: AcknowledgementReceiver,
|
||||
input_receiver: InputMessageReceiver,
|
||||
mix_sender: BatchMixMessageSender,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
let controller_config = client_core::client::real_messages_control::Config::new(
|
||||
self.key_manager.ack_key(),
|
||||
@@ -128,6 +136,7 @@ impl NymClient {
|
||||
mix_sender,
|
||||
topology_accessor,
|
||||
reply_key_storage,
|
||||
shutdown,
|
||||
)
|
||||
.start();
|
||||
}
|
||||
@@ -139,6 +148,7 @@ impl NymClient {
|
||||
query_receiver: ReceivedBufferRequestReceiver,
|
||||
mixnet_receiver: MixnetMessageReceiver,
|
||||
reply_key_storage: ReplyKeyStorage,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
info!("Starting received messages buffer controller...");
|
||||
ReceivedMessagesBufferController::new(
|
||||
@@ -146,6 +156,7 @@ impl NymClient {
|
||||
query_receiver,
|
||||
mixnet_receiver,
|
||||
reply_key_storage,
|
||||
shutdown,
|
||||
)
|
||||
.start()
|
||||
}
|
||||
@@ -154,6 +165,7 @@ impl NymClient {
|
||||
&mut self,
|
||||
mixnet_message_sender: MixnetMessageSender,
|
||||
ack_sender: AcknowledgementSender,
|
||||
shutdown: ShutdownListener,
|
||||
) -> GatewayClient {
|
||||
let gateway_id = self.config.get_base().get_gateway_id();
|
||||
if gateway_id.is_empty() {
|
||||
@@ -196,6 +208,7 @@ impl NymClient {
|
||||
ack_sender,
|
||||
self.config.get_base().get_gateway_response_timeout(),
|
||||
Some(bandwidth_controller),
|
||||
Some(shutdown),
|
||||
);
|
||||
|
||||
gateway_client
|
||||
@@ -211,7 +224,11 @@ impl NymClient {
|
||||
|
||||
// future responsible for periodically polling directory server and updating
|
||||
// the current global view of topology
|
||||
async fn start_topology_refresher(&mut self, topology_accessor: TopologyAccessor) {
|
||||
async fn start_topology_refresher(
|
||||
&mut self,
|
||||
topology_accessor: TopologyAccessor,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
let topology_refresher_config = TopologyRefresherConfig::new(
|
||||
self.config.get_base().get_validator_api_endpoints(),
|
||||
self.config.get_base().get_topology_refresh_rate(),
|
||||
@@ -233,7 +250,7 @@ impl NymClient {
|
||||
}
|
||||
|
||||
info!("Starting topology refresher...");
|
||||
topology_refresher.start();
|
||||
topology_refresher.start(shutdown);
|
||||
}
|
||||
|
||||
// controller for sending sphinx packets to mixnet (either real traffic or cover traffic)
|
||||
@@ -244,15 +261,17 @@ impl NymClient {
|
||||
&mut self,
|
||||
mix_rx: BatchMixMessageReceiver,
|
||||
gateway_client: GatewayClient,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
info!("Starting mix traffic controller...");
|
||||
MixTrafficController::new(mix_rx, gateway_client).start();
|
||||
MixTrafficController::new(mix_rx, gateway_client, shutdown).start();
|
||||
}
|
||||
|
||||
fn start_socks5_listener(
|
||||
&self,
|
||||
buffer_requester: ReceivedBufferRequestSender,
|
||||
msg_input: InputMessageSender,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
info!("Starting socks5 listener...");
|
||||
let auth_methods = vec![AuthenticationMethods::NoAuth as u8];
|
||||
@@ -264,43 +283,55 @@ impl NymClient {
|
||||
authenticator,
|
||||
self.config.get_provider_mix_address(),
|
||||
self.as_mix_recipient(),
|
||||
shutdown,
|
||||
);
|
||||
tokio::spawn(async move { sphinx_socks.serve(msg_input, buffer_requester).await });
|
||||
}
|
||||
|
||||
/// blocking version of `start` method. Will run forever (or until SIGINT is sent)
|
||||
pub async fn run_forever(&mut self) {
|
||||
self.start().await;
|
||||
if let Err(e) = tokio::signal::ctrl_c().await {
|
||||
error!(
|
||||
"There was an error while capturing SIGINT - {:?}. We will terminate regardless",
|
||||
e
|
||||
);
|
||||
}
|
||||
let mut shutdown = self.start().await;
|
||||
wait_for_signal().await;
|
||||
|
||||
println!(
|
||||
"Received SIGINT - the client will terminate now (threads are not yet nicely stopped, if you see stack traces that's alright)."
|
||||
);
|
||||
log::info!("Sending shutdown");
|
||||
SHUTDOWN_IS_SIGNALLED.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
client_core::client::SHUTDOWN_IS_SIGNALLED.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
shutdown.signal_shutdown().ok();
|
||||
|
||||
log::info!("Waiting for tasks to finish... (Press ctrl-c to force)");
|
||||
shutdown.wait_for_shutdown().await;
|
||||
|
||||
log::info!("Stopping nym-socks5-client");
|
||||
}
|
||||
|
||||
// Variant of `run_forever` that listends for remote control messages
|
||||
pub async fn run_and_listen(&mut self, mut receiver: Socks5ControlMessageReceiver) {
|
||||
self.start().await;
|
||||
let mut shutdown = self.start().await;
|
||||
tokio::select! {
|
||||
message = receiver.next() => {
|
||||
log::debug!("Received message: {:?}", message);
|
||||
match message {
|
||||
Some(Socks5ControlMessage::Stop) => {
|
||||
log::info!("Shutting down");
|
||||
log::info!("Graceful shutdown of tasks not yet implemented, you might see (harmless) panics until then");
|
||||
log::info!("Received stop message");
|
||||
}
|
||||
None => log::debug!("None"),
|
||||
}
|
||||
}
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
log::info!("Received SIGINT");
|
||||
},
|
||||
}
|
||||
|
||||
log::info!("Sending shutdown");
|
||||
shutdown.signal_shutdown().ok();
|
||||
|
||||
log::info!("Waiting for tasks to finish... (Press ctrl-c to force)");
|
||||
shutdown.wait_for_shutdown().await;
|
||||
|
||||
log::info!("Stopping nym-socks5-client");
|
||||
}
|
||||
|
||||
pub async fn start(&mut self) {
|
||||
pub async fn start(&mut self) -> ShutdownNotifier {
|
||||
info!("Starting nym client");
|
||||
// channels for inter-component communication
|
||||
// TODO: make the channels be internally created by the relevant components
|
||||
@@ -330,33 +361,52 @@ impl NymClient {
|
||||
ReplyKeyStorage::load(self.config.get_base().get_reply_encryption_key_store_path())
|
||||
.expect("Failed to load reply key storage!");
|
||||
|
||||
// Shutdown notifier for signalling tasks to stop
|
||||
let shutdown = ShutdownNotifier::default();
|
||||
|
||||
// the components are started in very specific order. Unless you know what you are doing,
|
||||
// do not change that.
|
||||
self.start_topology_refresher(shared_topology_accessor.clone())
|
||||
self.start_topology_refresher(shared_topology_accessor.clone(), shutdown.subscribe())
|
||||
.await;
|
||||
self.start_received_messages_buffer_controller(
|
||||
received_buffer_request_receiver,
|
||||
mixnet_messages_receiver,
|
||||
reply_key_storage.clone(),
|
||||
shutdown.subscribe(),
|
||||
);
|
||||
|
||||
let gateway_client = self
|
||||
.start_gateway_client(mixnet_messages_sender, ack_sender)
|
||||
.start_gateway_client(mixnet_messages_sender, ack_sender, shutdown.subscribe())
|
||||
.await;
|
||||
|
||||
self.start_mix_traffic_controller(sphinx_message_receiver, gateway_client);
|
||||
self.start_mix_traffic_controller(
|
||||
sphinx_message_receiver,
|
||||
gateway_client,
|
||||
shutdown.subscribe(),
|
||||
);
|
||||
self.start_real_traffic_controller(
|
||||
shared_topology_accessor.clone(),
|
||||
reply_key_storage,
|
||||
ack_receiver,
|
||||
input_receiver,
|
||||
sphinx_message_sender.clone(),
|
||||
shutdown.subscribe(),
|
||||
);
|
||||
|
||||
self.start_cover_traffic_stream(shared_topology_accessor, sphinx_message_sender);
|
||||
self.start_socks5_listener(received_buffer_request_sender, input_sender);
|
||||
self.start_cover_traffic_stream(
|
||||
shared_topology_accessor,
|
||||
sphinx_message_sender,
|
||||
shutdown.subscribe(),
|
||||
);
|
||||
self.start_socks5_listener(
|
||||
received_buffer_request_sender,
|
||||
input_sender,
|
||||
shutdown.subscribe(),
|
||||
);
|
||||
|
||||
info!("Client startup finished!");
|
||||
info!("The address of this client is: {}", self.as_mix_recipient());
|
||||
|
||||
shutdown
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ use socks5_requests::{ConnectionId, Message, RemoteAddress, Request};
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
use std::pin::Pin;
|
||||
use task::ShutdownListener;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
|
||||
use tokio::{self, net::TcpStream};
|
||||
|
||||
@@ -140,6 +141,7 @@ pub(crate) struct SocksClient {
|
||||
service_provider: Recipient,
|
||||
self_address: Recipient,
|
||||
started_proxy: bool,
|
||||
shutdown_listener: ShutdownListener,
|
||||
}
|
||||
|
||||
impl Drop for SocksClient {
|
||||
@@ -163,6 +165,7 @@ impl SocksClient {
|
||||
service_provider: Recipient,
|
||||
controller_sender: ControllerSender,
|
||||
self_address: Recipient,
|
||||
shutdown_listener: ShutdownListener,
|
||||
) -> Self {
|
||||
let connection_id = Self::generate_random();
|
||||
SocksClient {
|
||||
@@ -176,6 +179,7 @@ impl SocksClient {
|
||||
service_provider,
|
||||
self_address,
|
||||
started_proxy: false,
|
||||
shutdown_listener,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -250,6 +254,7 @@ impl SocksClient {
|
||||
conn_receiver,
|
||||
input_sender,
|
||||
connection_id,
|
||||
self.shutdown_listener.clone(),
|
||||
)
|
||||
.run(move |conn_id, read_data, socket_closed| {
|
||||
let provider_request = Request::new_send(conn_id, read_data, socket_closed);
|
||||
|
||||
@@ -6,11 +6,13 @@ use log::*;
|
||||
use nymsphinx::receiver::ReconstructedMessage;
|
||||
use proxy_helpers::connection_controller::{ControllerCommand, ControllerSender};
|
||||
use socks5_requests::Message;
|
||||
use task::ShutdownListener;
|
||||
|
||||
pub(crate) struct MixnetResponseListener {
|
||||
buffer_requester: ReceivedBufferRequestSender,
|
||||
mix_response_receiver: ReconstructedMessagesReceiver,
|
||||
controller_sender: ControllerSender,
|
||||
shutdown: ShutdownListener,
|
||||
}
|
||||
|
||||
impl Drop for MixnetResponseListener {
|
||||
@@ -25,6 +27,7 @@ impl MixnetResponseListener {
|
||||
pub(crate) fn new(
|
||||
buffer_requester: ReceivedBufferRequestSender,
|
||||
controller_sender: ControllerSender,
|
||||
shutdown: ShutdownListener,
|
||||
) -> Self {
|
||||
let (mix_response_sender, mix_response_receiver) = mpsc::unbounded();
|
||||
buffer_requester
|
||||
@@ -35,6 +38,7 @@ impl MixnetResponseListener {
|
||||
buffer_requester,
|
||||
mix_response_receiver,
|
||||
controller_sender,
|
||||
shutdown,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,11 +77,19 @@ impl MixnetResponseListener {
|
||||
}
|
||||
|
||||
pub(crate) async fn run(&mut self) {
|
||||
while let Some(received_responses) = self.mix_response_receiver.next().await {
|
||||
for reconstructed_message in received_responses {
|
||||
self.on_message(reconstructed_message).await;
|
||||
while !self.shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
Some(received_responses) = self.mix_response_receiver.next() => {
|
||||
for reconstructed_message in received_responses {
|
||||
self.on_message(reconstructed_message).await;
|
||||
}
|
||||
},
|
||||
_ = self.shutdown.recv() => {
|
||||
log::trace!("MixnetResponseListener: Received shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
error!("We should never see this message");
|
||||
let a = crate::client::SHUTDOWN_IS_SIGNALLED.load(std::sync::atomic::Ordering::Relaxed);
|
||||
log::debug!("MixnetResponseListener: Exiting");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ use log::*;
|
||||
use nymsphinx::addressing::clients::Recipient;
|
||||
use proxy_helpers::connection_controller::Controller;
|
||||
use std::net::SocketAddr;
|
||||
use task::ShutdownListener;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
/// A Socks5 server that listens for connections.
|
||||
@@ -19,6 +20,7 @@ pub struct SphinxSocksServer {
|
||||
listening_address: SocketAddr,
|
||||
service_provider: Recipient,
|
||||
self_address: Recipient,
|
||||
shutdown: ShutdownListener,
|
||||
}
|
||||
|
||||
impl SphinxSocksServer {
|
||||
@@ -28,6 +30,7 @@ impl SphinxSocksServer {
|
||||
authenticator: Authenticator,
|
||||
service_provider: Recipient,
|
||||
self_address: Recipient,
|
||||
shutdown: ShutdownListener,
|
||||
) -> Self {
|
||||
// hardcode ip as we (presumably) ONLY want to listen locally. If we change it, we can
|
||||
// just modify the config
|
||||
@@ -38,6 +41,7 @@ impl SphinxSocksServer {
|
||||
listening_address: format!("{}:{}", ip, port).parse().unwrap(),
|
||||
service_provider,
|
||||
self_address,
|
||||
shutdown,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,56 +62,66 @@ impl SphinxSocksServer {
|
||||
});
|
||||
|
||||
// listener for mix messages
|
||||
let mut mixnet_response_listener =
|
||||
MixnetResponseListener::new(buffer_requester, controller_sender.clone());
|
||||
|
||||
let mut mixnet_response_listener = MixnetResponseListener::new(
|
||||
buffer_requester,
|
||||
controller_sender.clone(),
|
||||
self.shutdown.clone(),
|
||||
);
|
||||
tokio::spawn(async move {
|
||||
mixnet_response_listener.run().await;
|
||||
});
|
||||
|
||||
loop {
|
||||
if let Ok((stream, _remote)) = listener.accept().await {
|
||||
// TODO Optimize this
|
||||
let mut client = SocksClient::new(
|
||||
stream,
|
||||
self.authenticator.clone(),
|
||||
input_sender.clone(),
|
||||
self.service_provider,
|
||||
controller_sender.clone(),
|
||||
self.self_address,
|
||||
);
|
||||
tokio::select! {
|
||||
Ok((stream, _remote)) = listener.accept() => {
|
||||
// TODO Optimize this
|
||||
let mut client = SocksClient::new(
|
||||
stream,
|
||||
self.authenticator.clone(),
|
||||
input_sender.clone(),
|
||||
self.service_provider,
|
||||
controller_sender.clone(),
|
||||
self.self_address,
|
||||
self.shutdown.clone(),
|
||||
);
|
||||
|
||||
tokio::spawn(async move {
|
||||
{
|
||||
match client.run().await {
|
||||
Ok(_) => {}
|
||||
Err(error) => {
|
||||
error!("Error! {}", error);
|
||||
let error_text = format!("{}", error);
|
||||
tokio::spawn(async move {
|
||||
{
|
||||
match client.run().await {
|
||||
Ok(_) => {}
|
||||
Err(error) => {
|
||||
error!("Error! {}", error);
|
||||
let error_text = format!("{}", error);
|
||||
|
||||
let response: ResponseCode;
|
||||
let response: ResponseCode;
|
||||
|
||||
if error_text.contains("Host") {
|
||||
response = ResponseCode::HostUnreachable;
|
||||
} else if error_text.contains("Network") {
|
||||
response = ResponseCode::NetworkUnreachable;
|
||||
} else if error_text.contains("ttl") {
|
||||
response = ResponseCode::TtlExpired
|
||||
} else {
|
||||
response = ResponseCode::Failure
|
||||
if error_text.contains("Host") {
|
||||
response = ResponseCode::HostUnreachable;
|
||||
} else if error_text.contains("Network") {
|
||||
response = ResponseCode::NetworkUnreachable;
|
||||
} else if error_text.contains("ttl") {
|
||||
response = ResponseCode::TtlExpired
|
||||
} else {
|
||||
response = ResponseCode::Failure
|
||||
}
|
||||
|
||||
if client.error(response).await.is_err() {
|
||||
warn!("Failed to send error code");
|
||||
};
|
||||
if client.shutdown().await.is_err() {
|
||||
warn!("Failed to shutdown TcpStream");
|
||||
};
|
||||
}
|
||||
|
||||
if client.error(response).await.is_err() {
|
||||
warn!("Failed to send error code");
|
||||
};
|
||||
if client.shutdown().await.is_err() {
|
||||
warn!("Failed to shutdown TcpStream");
|
||||
};
|
||||
}
|
||||
};
|
||||
// client gets dropped here
|
||||
}
|
||||
});
|
||||
};
|
||||
// client gets dropped here
|
||||
}
|
||||
});
|
||||
},
|
||||
_ = self.shutdown.recv() => {
|
||||
log::trace!("SphinxSocksServer: Received shutdown");
|
||||
log::debug!("SphinxSocksServer: Exiting");
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,13 +20,14 @@ web3 = { version = "0.17.0", default-features = false }
|
||||
async-trait = { version = "0.1.51" }
|
||||
|
||||
# internal
|
||||
coconut-interface = { path = "../../coconut-interface", optional = true }
|
||||
credentials = { path = "../../credentials" }
|
||||
crypto = { path = "../../crypto" }
|
||||
gateway-requests = { path = "../../../gateway/gateway-requests" }
|
||||
network-defaults = { path = "../../network-defaults" }
|
||||
nymsphinx = { path = "../../nymsphinx" }
|
||||
pemstore = { path = "../../pemstore" }
|
||||
coconut-interface = { path = "../../coconut-interface", optional = true }
|
||||
network-defaults = { path = "../../network-defaults" }
|
||||
task = { path = "../../task" }
|
||||
validator-client = { path = "../validator-client", optional = true }
|
||||
|
||||
[dependencies.tungstenite]
|
||||
@@ -38,6 +39,10 @@ default-features = false
|
||||
version = "1.19.1"
|
||||
features = ["macros", "rt", "net", "sync", "time"]
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-stream]
|
||||
version = "0.1.9"
|
||||
features = ["net", "sync", "time"]
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-tungstenite]
|
||||
version = "0.14"
|
||||
|
||||
|
||||
@@ -30,6 +30,7 @@ use rand::rngs::OsRng;
|
||||
use std::convert::TryFrom;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use task::ShutdownListener;
|
||||
use tungstenite::protocol::Message;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
@@ -65,6 +66,10 @@ pub struct GatewayClient {
|
||||
reconnection_attempts: usize,
|
||||
/// Delay between each subsequent reconnection attempt.
|
||||
reconnection_backoff: Duration,
|
||||
|
||||
/// Listen to shutdown messages. This is an option since we don't require it when for example
|
||||
/// when doing initial authentication.
|
||||
shutdown: Option<ShutdownListener>,
|
||||
}
|
||||
|
||||
impl GatewayClient {
|
||||
@@ -80,6 +85,7 @@ impl GatewayClient {
|
||||
ack_sender: AcknowledgementSender,
|
||||
response_timeout_duration: Duration,
|
||||
bandwidth_controller: Option<BandwidthController<PersistentStorage>>,
|
||||
shutdown: Option<ShutdownListener>,
|
||||
) -> Self {
|
||||
GatewayClient {
|
||||
authenticated: false,
|
||||
@@ -97,6 +103,7 @@ impl GatewayClient {
|
||||
should_reconnect_on_failure: true,
|
||||
reconnection_attempts: DEFAULT_RECONNECTION_ATTEMPTS,
|
||||
reconnection_backoff: DEFAULT_RECONNECTION_BACKOFF,
|
||||
shutdown,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -148,6 +155,7 @@ impl GatewayClient {
|
||||
should_reconnect_on_failure: false,
|
||||
reconnection_attempts: DEFAULT_RECONNECTION_ATTEMPTS,
|
||||
reconnection_backoff: DEFAULT_RECONNECTION_BACKOFF,
|
||||
shutdown: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -298,7 +306,7 @@ impl GatewayClient {
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -723,6 +731,7 @@ impl GatewayClient {
|
||||
.as_ref()
|
||||
.expect("no shared key present even though we're authenticated!"),
|
||||
),
|
||||
self.shutdown.clone(),
|
||||
)
|
||||
}
|
||||
_ => unreachable!(),
|
||||
|
||||
@@ -11,6 +11,7 @@ use gateway_requests::registration::handshake::SharedKeys;
|
||||
use gateway_requests::BinaryResponse;
|
||||
use log::*;
|
||||
use std::sync::Arc;
|
||||
use task::ShutdownListener;
|
||||
use tungstenite::Message;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
@@ -86,6 +87,7 @@ impl PartiallyDelegated {
|
||||
conn: WsConn,
|
||||
packet_router: PacketRouter,
|
||||
shared_key: Arc<SharedKeys>,
|
||||
shutdown: Option<ShutdownListener>,
|
||||
) -> Self {
|
||||
// when called for, it NEEDS TO yield back the stream so that we could merge it and
|
||||
// read control request responses.
|
||||
@@ -98,9 +100,15 @@ impl PartiallyDelegated {
|
||||
let mut fused_receiver = notify_receiver.fuse();
|
||||
let mut fused_stream = (&mut stream).fuse();
|
||||
|
||||
// Bit of an ugly workaround for selecting on an `Option`. The unwrap is lazy so we use
|
||||
// this bool inside the `select` to guard against unwrapping in the `None` case.
|
||||
let shutdown_is_some = shutdown.is_some();
|
||||
let shutdown_recv_lazy = async { shutdown.unwrap().recv().await };
|
||||
tokio::pin!(shutdown_recv_lazy);
|
||||
|
||||
let ret_err = loop {
|
||||
futures::select! {
|
||||
_ = fused_receiver => {
|
||||
tokio::select! {
|
||||
_ = &mut fused_receiver => {
|
||||
break Ok(());
|
||||
}
|
||||
msg = fused_stream.next() => {
|
||||
@@ -110,6 +118,11 @@ impl PartiallyDelegated {
|
||||
};
|
||||
Self::route_socket_message(ws_msg, &packet_router, shared_key.as_ref());
|
||||
}
|
||||
_ = &mut shutdown_recv_lazy, if shutdown_is_some => {
|
||||
log::trace!("GatewayClient listener: Received shutdown");
|
||||
log::debug!("GatewayClient listener: Exiting");
|
||||
return;
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ futures = "0.3"
|
||||
log = "0.4"
|
||||
socks5-requests = { path = "../requests" }
|
||||
ordered-buffer = { path = "../ordered-buffer" }
|
||||
task = { path = "../../task" }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio-test = "0.4.2"
|
||||
|
||||
@@ -170,16 +170,23 @@ impl Controller {
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) {
|
||||
while let Some(command) = self.receiver.next().await {
|
||||
match command {
|
||||
ControllerCommand::Send(conn_id, data, is_closed) => {
|
||||
self.send_to_connection(conn_id, data, is_closed)
|
||||
loop {
|
||||
tokio::select! {
|
||||
command = self.receiver.next() => match command {
|
||||
Some(ControllerCommand::Send(conn_id, data, is_closed)) => {
|
||||
self.send_to_connection(conn_id, data, is_closed)
|
||||
}
|
||||
Some(ControllerCommand::Insert(conn_id, sender)) => {
|
||||
self.insert_connection(conn_id, sender)
|
||||
}
|
||||
Some(ControllerCommand::Remove(conn_id)) => self.remove_connection(conn_id),
|
||||
None => {
|
||||
log::trace!("SOCKS5 Controller: Stopping since channel closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
ControllerCommand::Insert(conn_id, sender) => {
|
||||
self.insert_connection(conn_id, sender)
|
||||
}
|
||||
ControllerCommand::Remove(conn_id) => self.remove_connection(conn_id),
|
||||
}
|
||||
}
|
||||
log::debug!("SOCKS5 Controller: Exiting");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ use log::*;
|
||||
use ordered_buffer::OrderedMessageSender;
|
||||
use socks5_requests::ConnectionId;
|
||||
use std::{io, sync::Arc};
|
||||
use task::ShutdownListener;
|
||||
use tokio::select;
|
||||
use tokio::{net::tcp::OwnedReadHalf, sync::Notify, time::sleep};
|
||||
|
||||
@@ -74,6 +75,7 @@ where
|
||||
is_finished
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(super) async fn run_inbound<F, S>(
|
||||
mut reader: OwnedReadHalf,
|
||||
local_destination_address: String, // addresses are provided for better logging
|
||||
@@ -82,6 +84,7 @@ pub(super) async fn run_inbound<F, S>(
|
||||
mix_sender: MixProxySender<S>,
|
||||
adapter_fn: F,
|
||||
shutdown_notify: Arc<Notify>,
|
||||
mut shutdown_listener: ShutdownListener,
|
||||
) -> OwnedReadHalf
|
||||
where
|
||||
F: Fn(ConnectionId, Vec<u8>, bool) -> S + Send + 'static,
|
||||
@@ -106,6 +109,10 @@ where
|
||||
send_empty_close(connection_id, &mut message_sender, &mix_sender, &adapter_fn);
|
||||
break;
|
||||
}
|
||||
_ = shutdown_listener.recv() => {
|
||||
log::trace!("ProxyRunner inbound: Received shutdown");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
trace!("{} - inbound closed", connection_id);
|
||||
|
||||
@@ -5,6 +5,7 @@ use crate::connection_controller::ConnectionReceiver;
|
||||
use futures::channel::mpsc;
|
||||
use socks5_requests::ConnectionId;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use task::ShutdownListener;
|
||||
use tokio::{net::TcpStream, sync::Notify};
|
||||
|
||||
mod inbound;
|
||||
@@ -44,6 +45,9 @@ pub struct ProxyRunner<S> {
|
||||
local_destination_address: String,
|
||||
remote_source_address: String,
|
||||
connection_id: ConnectionId,
|
||||
|
||||
// Listens to shutdown commands from higher up
|
||||
shutdown_listener: ShutdownListener,
|
||||
}
|
||||
|
||||
impl<S> ProxyRunner<S>
|
||||
@@ -57,6 +61,7 @@ where
|
||||
mix_receiver: ConnectionReceiver,
|
||||
mix_sender: MixProxySender<S>,
|
||||
connection_id: ConnectionId,
|
||||
shutdown_listener: ShutdownListener,
|
||||
) -> Self {
|
||||
ProxyRunner {
|
||||
mix_receiver: Some(mix_receiver),
|
||||
@@ -65,6 +70,7 @@ where
|
||||
local_destination_address,
|
||||
remote_source_address,
|
||||
connection_id,
|
||||
shutdown_listener,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -86,6 +92,7 @@ where
|
||||
self.mix_sender.clone(),
|
||||
adapter_fn,
|
||||
Arc::clone(&shutdown_notify),
|
||||
self.shutdown_listener.clone(),
|
||||
);
|
||||
|
||||
let outbound_future = outbound::run_outbound(
|
||||
@@ -95,6 +102,7 @@ where
|
||||
self.mix_receiver.take().unwrap(),
|
||||
self.connection_id,
|
||||
shutdown_notify,
|
||||
self.shutdown_listener.clone(),
|
||||
);
|
||||
|
||||
// TODO: this shouldn't really have to spawn tasks inside "library" code, but
|
||||
|
||||
@@ -8,6 +8,7 @@ use futures::StreamExt;
|
||||
use log::*;
|
||||
use socks5_requests::ConnectionId;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use task::ShutdownListener;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::select;
|
||||
use tokio::{net::tcp::OwnedWriteHalf, sync::Notify, time::sleep, time::Instant};
|
||||
@@ -50,6 +51,7 @@ pub(super) async fn run_outbound(
|
||||
mut mix_receiver: ConnectionReceiver,
|
||||
connection_id: ConnectionId,
|
||||
shutdown_notify: Arc<Notify>,
|
||||
mut shutdown_listener: ShutdownListener,
|
||||
) -> (OwnedWriteHalf, ConnectionReceiver) {
|
||||
let shutdown_future = shutdown_notify.notified().then(|_| sleep(SHUTDOWN_TIMEOUT));
|
||||
tokio::pin!(shutdown_future);
|
||||
@@ -78,6 +80,10 @@ pub(super) async fn run_outbound(
|
||||
debug!("closing outbound proxy after inbound was closed {:?} ago", SHUTDOWN_TIMEOUT);
|
||||
break;
|
||||
}
|
||||
_ = shutdown_listener.recv() => {
|
||||
log::trace!("ProxyRunner outbound: Received shutdown");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,5 +2,7 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
pub mod shutdown;
|
||||
pub mod signal;
|
||||
|
||||
pub use shutdown::{ShutdownListener, ShutdownNotifier};
|
||||
pub use signal::wait_for_signal;
|
||||
|
||||
@@ -92,6 +92,25 @@ impl ShutdownListener {
|
||||
let _ = self.notify.changed().await;
|
||||
self.shutdown = true;
|
||||
}
|
||||
|
||||
pub fn is_shutdown_poll(&mut self) -> bool {
|
||||
if self.shutdown {
|
||||
return true;
|
||||
}
|
||||
match self.notify.has_changed() {
|
||||
Ok(has_changed) => {
|
||||
if has_changed {
|
||||
self.shutdown = true;
|
||||
}
|
||||
has_changed
|
||||
}
|
||||
Err(err) => {
|
||||
log::debug!("Polling shutdown failed: {err}");
|
||||
log::debug!("Assuming this means we should shutdown...");
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -0,0 +1,27 @@
|
||||
#[cfg(unix)]
|
||||
pub async fn wait_for_signal() {
|
||||
use tokio::signal::unix::{signal, SignalKind};
|
||||
let mut sigterm = signal(SignalKind::terminate()).expect("Failed to setup SIGTERM channel");
|
||||
let mut sigquit = signal(SignalKind::quit()).expect("Failed to setup SIGQUIT channel");
|
||||
|
||||
tokio::select! {
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
log::info!("Received SIGINT");
|
||||
},
|
||||
_ = sigterm.recv() => {
|
||||
log::info!("Received SIGTERM");
|
||||
}
|
||||
_ = sigquit.recv() => {
|
||||
log::info!("Received SIGQUIT");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(unix))]
|
||||
pub async fn wait_for_signal() {
|
||||
tokio::select! {
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
log::info!("Received SIGINT");
|
||||
},
|
||||
}
|
||||
}
|
||||
+1
-29
@@ -26,7 +26,7 @@ use rand::thread_rng;
|
||||
use std::net::SocketAddr;
|
||||
use std::process;
|
||||
use std::sync::Arc;
|
||||
use task::{ShutdownListener, ShutdownNotifier};
|
||||
use task::{wait_for_signal, ShutdownListener, ShutdownNotifier};
|
||||
use version_checker::parse_version;
|
||||
|
||||
mod http;
|
||||
@@ -334,31 +334,3 @@ impl MixNode {
|
||||
self.wait_for_interrupt(shutdown).await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
async fn wait_for_signal() {
|
||||
use tokio::signal::unix::{signal, SignalKind};
|
||||
let mut sigterm = signal(SignalKind::terminate()).expect("Failed to setup SIGTERM channel");
|
||||
let mut sigquit = signal(SignalKind::quit()).expect("Failed to setup SIGQUIT channel");
|
||||
|
||||
tokio::select! {
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
log::info!("Received SIGINT");
|
||||
},
|
||||
_ = sigterm.recv() => {
|
||||
log::info!("Received SIGTERM");
|
||||
}
|
||||
_ = sigquit.recv() => {
|
||||
log::info!("Received SIGQUIT");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(unix))]
|
||||
async fn wait_for_signal() {
|
||||
tokio::select! {
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
log::info!("Received SIGINT");
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
Generated
+15
@@ -630,6 +630,7 @@ dependencies = [
|
||||
"serde",
|
||||
"sled",
|
||||
"tap",
|
||||
"task",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"topology",
|
||||
@@ -1921,8 +1922,10 @@ dependencies = [
|
||||
"pemstore",
|
||||
"rand 0.7.3",
|
||||
"secp256k1",
|
||||
"task",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-tungstenite",
|
||||
"tungstenite",
|
||||
"url",
|
||||
@@ -3461,6 +3464,7 @@ dependencies = [
|
||||
"serde",
|
||||
"snafu",
|
||||
"socks5-requests",
|
||||
"task",
|
||||
"tokio",
|
||||
"topology",
|
||||
"url",
|
||||
@@ -4268,6 +4272,7 @@ dependencies = [
|
||||
"log",
|
||||
"ordered-buffer",
|
||||
"socks5-requests",
|
||||
"task",
|
||||
"tokio",
|
||||
"tokio-util 0.7.3",
|
||||
]
|
||||
@@ -5626,6 +5631,15 @@ dependencies = [
|
||||
"xattr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "task"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"log",
|
||||
"tokio",
|
||||
"tokio-util 0.7.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tauri"
|
||||
version = "1.0.3"
|
||||
@@ -6072,6 +6086,7 @@ dependencies = [
|
||||
"futures-core",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"tokio-util 0.7.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -34,4 +34,5 @@ ordered-buffer = {path = "../../common/socks5/ordered-buffer"}
|
||||
proxy-helpers = { path = "../../common/socks5/proxy-helpers" }
|
||||
socks5-requests = { path = "../../common/socks5/requests" }
|
||||
statistics-common = { path = "../../common/statistics" }
|
||||
task = { path = "../../common/task" }
|
||||
websocket-requests = { path = "../../clients/native/websocket-requests" }
|
||||
|
||||
@@ -7,6 +7,7 @@ use proxy_helpers::connection_controller::ConnectionReceiver;
|
||||
use proxy_helpers::proxy_runner::ProxyRunner;
|
||||
use socks5_requests::{ConnectionId, Message as Socks5Message, RemoteAddress, Response};
|
||||
use std::io;
|
||||
use task::ShutdownListener;
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
/// A TCP connection between the Socks5 service provider, which makes
|
||||
@@ -40,6 +41,7 @@ impl Connection {
|
||||
&mut self,
|
||||
mix_receiver: ConnectionReceiver,
|
||||
mix_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
let stream = self.conn.take().unwrap();
|
||||
let remote_source_address = "???".to_string(); // we don't know ip address of requester
|
||||
@@ -52,6 +54,7 @@ impl Connection {
|
||||
mix_receiver,
|
||||
mix_sender,
|
||||
connection_id,
|
||||
shutdown,
|
||||
)
|
||||
.run(move |conn_id, read_data, socket_closed| {
|
||||
(
|
||||
|
||||
@@ -19,6 +19,7 @@ use socks5_requests::{
|
||||
use statistics_common::collector::StatisticsSender;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use task::ShutdownListener;
|
||||
use tokio_tungstenite::tungstenite::protocol::Message;
|
||||
use websocket::WebsocketConnectionError;
|
||||
use websocket_requests::{requests::ClientRequest, responses::ServerResponse};
|
||||
@@ -134,6 +135,7 @@ impl ServiceProvider {
|
||||
return_address: Recipient,
|
||||
controller_sender: ControllerSender,
|
||||
mix_input_sender: mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
let mut conn = match Connection::new(conn_id, remote_addr.clone(), return_address).await {
|
||||
Ok(conn) => conn,
|
||||
@@ -170,7 +172,8 @@ impl ServiceProvider {
|
||||
);
|
||||
|
||||
// run the proxy on the connection
|
||||
conn.run_proxy(mix_receiver, mix_input_sender).await;
|
||||
conn.run_proxy(mix_receiver, mix_input_sender, shutdown)
|
||||
.await;
|
||||
|
||||
// proxy is done - remove the access channel from the controller
|
||||
controller_sender
|
||||
@@ -192,6 +195,7 @@ impl ServiceProvider {
|
||||
conn_id: ConnectionId,
|
||||
remote_addr: String,
|
||||
return_address: Recipient,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
if !self.open_proxy && !self.outbound_request_filter.check(&remote_addr) {
|
||||
let log_msg = format!("Domain {:?} failed filter check", remote_addr);
|
||||
@@ -218,6 +222,7 @@ impl ServiceProvider {
|
||||
return_address,
|
||||
controller_sender_clone,
|
||||
mix_input_sender_clone,
|
||||
shutdown,
|
||||
)
|
||||
.await
|
||||
});
|
||||
@@ -241,6 +246,7 @@ impl ServiceProvider {
|
||||
controller_sender: &mut ControllerSender,
|
||||
mix_input_sender: &mpsc::UnboundedSender<(Socks5Message, Recipient)>,
|
||||
stats_collector: Option<ServiceStatisticsCollector>,
|
||||
shutdown: ShutdownListener,
|
||||
) {
|
||||
let deserialized_msg = match Socks5Message::try_from_bytes(raw_request) {
|
||||
Ok(msg) => msg,
|
||||
@@ -265,6 +271,7 @@ impl ServiceProvider {
|
||||
req.conn_id,
|
||||
req.remote_addr,
|
||||
req.return_address,
|
||||
shutdown,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -302,7 +309,10 @@ impl ServiceProvider {
|
||||
let (mix_input_sender, mix_input_receiver) =
|
||||
mpsc::unbounded::<(Socks5Message, Recipient)>();
|
||||
|
||||
// controller for managing all active connections
|
||||
// Controller for managing all active connections.
|
||||
// We provide it with a ShutdownListener since it requires it, even though for the network
|
||||
// requester shutdown signalling is not yet fully implemented.
|
||||
let shutdown = task::ShutdownNotifier::default();
|
||||
let (mut active_connections_controller, mut controller_sender) = Controller::new();
|
||||
tokio::spawn(async move {
|
||||
active_connections_controller.run().await;
|
||||
@@ -353,6 +363,7 @@ impl ServiceProvider {
|
||||
&mut controller_sender,
|
||||
&mix_input_sender,
|
||||
stats_collector.clone(),
|
||||
shutdown.subscribe(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -209,6 +209,7 @@ impl PacketSender {
|
||||
// currently we do not care about acks at all, but we must keep the channel alive
|
||||
// so that the gateway client would not crash
|
||||
let (ack_sender, ack_receiver) = mpsc::unbounded();
|
||||
|
||||
let mut gateway_client = GatewayClient::new(
|
||||
address,
|
||||
Arc::clone(&fresh_gateway_client_data.local_identity),
|
||||
@@ -219,6 +220,7 @@ impl PacketSender {
|
||||
ack_sender,
|
||||
fresh_gateway_client_data.gateway_response_timeout,
|
||||
Some(fresh_gateway_client_data.bandwidth_controller.clone()),
|
||||
None,
|
||||
);
|
||||
|
||||
gateway_client
|
||||
|
||||
Reference in New Issue
Block a user