Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 228380b070 | |||
| 8824a875d9 |
Generated
+9
-9
@@ -2864,7 +2864,7 @@ checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
|
||||
|
||||
[[package]]
|
||||
name = "explorer-api"
|
||||
version = "1.1.32"
|
||||
version = "1.1.33"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"clap 4.4.7",
|
||||
@@ -5764,7 +5764,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-api"
|
||||
version = "1.1.34"
|
||||
version = "1.1.35"
|
||||
dependencies = [
|
||||
"actix-web",
|
||||
"anyhow",
|
||||
@@ -5922,7 +5922,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-cli"
|
||||
version = "1.1.33"
|
||||
version = "1.1.34"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"base64 0.13.1",
|
||||
@@ -5998,7 +5998,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-client"
|
||||
version = "1.1.32"
|
||||
version = "1.1.33"
|
||||
dependencies = [
|
||||
"clap 4.4.7",
|
||||
"dirs 4.0.0",
|
||||
@@ -6347,7 +6347,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-gateway"
|
||||
version = "1.1.32"
|
||||
version = "1.1.33"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
@@ -6567,7 +6567,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-mixnode"
|
||||
version = "1.1.34"
|
||||
version = "1.1.35"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"axum",
|
||||
@@ -6687,7 +6687,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-network-requester"
|
||||
version = "1.1.32"
|
||||
version = "1.1.33"
|
||||
dependencies = [
|
||||
"addr",
|
||||
"anyhow",
|
||||
@@ -6736,7 +6736,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-network-statistics"
|
||||
version = "1.1.32"
|
||||
version = "1.1.33"
|
||||
dependencies = [
|
||||
"dirs 4.0.0",
|
||||
"log",
|
||||
@@ -6981,7 +6981,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-socks5-client"
|
||||
version = "1.1.32"
|
||||
version = "1.1.33"
|
||||
dependencies = [
|
||||
"clap 4.4.7",
|
||||
"lazy_static",
|
||||
|
||||
@@ -290,6 +290,7 @@ where
|
||||
shutdown: TaskClient,
|
||||
packet_type: PacketType,
|
||||
stats_tx: PacketStatisticsReporter,
|
||||
counter_receiver: mpsc::Receiver<u8>,
|
||||
) {
|
||||
info!("Starting real traffic stream...");
|
||||
|
||||
@@ -305,12 +306,14 @@ where
|
||||
lane_queue_lengths,
|
||||
client_connection_rx,
|
||||
stats_tx,
|
||||
counter_receiver,
|
||||
)
|
||||
.start_with_shutdown(shutdown, packet_type);
|
||||
}
|
||||
|
||||
// buffer controlling all messages fetched from provider
|
||||
// required so that other components would be able to use them (say the websocket)
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn start_received_messages_buffer_controller(
|
||||
local_encryption_keypair: Arc<encryption::KeyPair>,
|
||||
query_receiver: ReceivedBufferRequestReceiver,
|
||||
@@ -319,6 +322,7 @@ where
|
||||
reply_controller_sender: ReplyControllerSender,
|
||||
shutdown: TaskClient,
|
||||
packet_statistics_control: PacketStatisticsReporter,
|
||||
counter_sender: mpsc::Sender<u8>,
|
||||
) {
|
||||
info!("Starting received messages buffer controller...");
|
||||
let controller: ReceivedMessagesBufferController<SphinxMessageReceiver> =
|
||||
@@ -329,6 +333,7 @@ where
|
||||
reply_key_storage,
|
||||
reply_controller_sender,
|
||||
packet_statistics_control,
|
||||
counter_sender,
|
||||
);
|
||||
controller.start_with_shutdown(shutdown)
|
||||
}
|
||||
@@ -606,6 +611,9 @@ where
|
||||
// channels responsible for controlling real messages
|
||||
let (input_sender, input_receiver) = tokio::sync::mpsc::channel::<InputMessage>(1);
|
||||
|
||||
// drop cover traffic counter channel
|
||||
let (counter_sender, counter_receiver) = mpsc::channel::<u8>(10000);
|
||||
|
||||
// channels responsible for controlling ack messages
|
||||
let (ack_sender, ack_receiver) = mpsc::unbounded();
|
||||
let shared_topology_accessor = TopologyAccessor::new();
|
||||
@@ -681,6 +689,7 @@ where
|
||||
reply_controller_sender.clone(),
|
||||
shutdown.fork("received_messages_buffer"),
|
||||
packet_stats_reporter.clone(),
|
||||
counter_sender,
|
||||
);
|
||||
|
||||
// The message_sender is the transmitter for any component generating sphinx packets
|
||||
@@ -720,6 +729,7 @@ where
|
||||
shutdown.fork("real_traffic_controller"),
|
||||
self.config.debug.traffic.packet_type,
|
||||
packet_stats_reporter.clone(),
|
||||
counter_receiver,
|
||||
);
|
||||
|
||||
if !self
|
||||
|
||||
@@ -146,6 +146,7 @@ impl RealMessagesController<OsRng> {
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
client_connection_rx: ConnectionCommandReceiver,
|
||||
stats_tx: PacketStatisticsReporter,
|
||||
counter_receiver: mpsc::Receiver<u8>,
|
||||
) -> Self {
|
||||
let rng = OsRng;
|
||||
|
||||
@@ -204,6 +205,7 @@ impl RealMessagesController<OsRng> {
|
||||
lane_queue_lengths,
|
||||
client_connection_rx,
|
||||
stats_tx,
|
||||
counter_receiver,
|
||||
);
|
||||
|
||||
RealMessagesController {
|
||||
|
||||
@@ -8,13 +8,14 @@ use crate::client::real_messages_control::acknowledgement_control::SentPacketNot
|
||||
use crate::client::topology_control::TopologyAccessor;
|
||||
use crate::client::transmission_buffer::TransmissionBuffer;
|
||||
use crate::config;
|
||||
use futures::channel::mpsc;
|
||||
use futures::task::{Context, Poll};
|
||||
use futures::{Future, Stream, StreamExt};
|
||||
use log::*;
|
||||
use nym_sphinx::acknowledgements::AckKey;
|
||||
use nym_sphinx::addressing::clients::Recipient;
|
||||
use nym_sphinx::chunking::fragment::FragmentIdentifier;
|
||||
use nym_sphinx::cover::generate_loop_cover_packet;
|
||||
use nym_sphinx::cover::{generate_drop_cover_packet, generate_loop_cover_packet};
|
||||
use nym_sphinx::forwarding::packet::MixPacket;
|
||||
use nym_sphinx::params::PacketSize;
|
||||
use nym_sphinx::preparer::PreparedFragment;
|
||||
@@ -117,6 +118,8 @@ where
|
||||
|
||||
/// Channel used for sending statistics events to `PacketStatisticsControl`.
|
||||
stats_tx: PacketStatisticsReporter,
|
||||
//counter for drop cover traffic
|
||||
counter_receiver: mpsc::Receiver<u8>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -155,7 +158,7 @@ pub(crate) type BatchRealMessageSender =
|
||||
type BatchRealMessageReceiver = tokio::sync::mpsc::Receiver<(Vec<RealMessage>, TransmissionLane)>;
|
||||
|
||||
pub(crate) enum StreamMessage {
|
||||
Cover,
|
||||
Cover(bool),
|
||||
Real(Box<RealMessage>),
|
||||
}
|
||||
|
||||
@@ -176,6 +179,7 @@ where
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
client_connection_rx: ConnectionCommandReceiver,
|
||||
stats_tx: PacketStatisticsReporter,
|
||||
counter_receiver: mpsc::Receiver<u8>,
|
||||
) -> Self {
|
||||
OutQueueControl {
|
||||
config,
|
||||
@@ -190,6 +194,7 @@ where
|
||||
client_connection_rx,
|
||||
lane_queue_lengths,
|
||||
stats_tx,
|
||||
counter_receiver,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -221,7 +226,7 @@ where
|
||||
trace!("created new message");
|
||||
|
||||
let (next_message, fragment_id, packet_size) = match next_message {
|
||||
StreamMessage::Cover => {
|
||||
StreamMessage::Cover(drop) => {
|
||||
let cover_traffic_packet_size = self.loop_cover_message_size();
|
||||
trace!("the next loop cover message will be put in a {cover_traffic_packet_size} packet");
|
||||
|
||||
@@ -240,24 +245,44 @@ where
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
(
|
||||
generate_loop_cover_packet(
|
||||
&mut self.rng,
|
||||
topology_ref,
|
||||
&self.config.ack_key,
|
||||
&self.config.our_full_destination,
|
||||
self.config.average_ack_delay,
|
||||
self.config.traffic.average_packet_delay,
|
||||
cover_traffic_packet_size,
|
||||
self.config.traffic.packet_type,
|
||||
if drop {
|
||||
debug!("Sending a drop cover message");
|
||||
(
|
||||
generate_drop_cover_packet(
|
||||
&mut self.rng,
|
||||
topology_ref,
|
||||
&self.config.ack_key,
|
||||
&self.config.our_full_destination,
|
||||
self.config.average_ack_delay,
|
||||
self.config.traffic.average_packet_delay,
|
||||
cover_traffic_packet_size,
|
||||
self.config.traffic.packet_type,
|
||||
)
|
||||
.expect(
|
||||
"Somehow failed to generate a drop cover message with a valid topology",
|
||||
),
|
||||
None,
|
||||
cover_traffic_packet_size.size(),
|
||||
)
|
||||
.expect(
|
||||
"Somehow failed to generate a loop cover message with a valid topology",
|
||||
),
|
||||
None,
|
||||
cover_traffic_packet_size.size(),
|
||||
)
|
||||
} else {
|
||||
(
|
||||
generate_loop_cover_packet(
|
||||
&mut self.rng,
|
||||
topology_ref,
|
||||
&self.config.ack_key,
|
||||
&self.config.our_full_destination,
|
||||
self.config.average_ack_delay,
|
||||
self.config.traffic.average_packet_delay,
|
||||
cover_traffic_packet_size,
|
||||
self.config.traffic.packet_type,
|
||||
)
|
||||
.expect(
|
||||
"Somehow failed to generate a loop cover message with a valid topology",
|
||||
),
|
||||
None,
|
||||
cover_traffic_packet_size.size(),
|
||||
)
|
||||
}
|
||||
}
|
||||
StreamMessage::Real(real_message) => {
|
||||
let packet_size = real_message.packet_size();
|
||||
@@ -385,6 +410,10 @@ where
|
||||
}
|
||||
|
||||
fn poll_poisson(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
|
||||
//For instantaneous drop cover traffic
|
||||
// if let Ok(_) = self.counter_receiver.try_next() {
|
||||
// return Poll::Ready(Some(StreamMessage::Cover(true)));
|
||||
// };
|
||||
// The average delay could change depending on if backpressure in the downstream channel
|
||||
// (mix_tx) was detected.
|
||||
self.adjust_current_average_message_sending_delay();
|
||||
@@ -419,6 +448,7 @@ where
|
||||
// in `Vec`, this ensures that on average we will fetch messages faster than we can
|
||||
// send, which is a condition for being able to multiplex packets from multiple
|
||||
// data streams.
|
||||
let need_drop = self.counter_receiver.try_next();
|
||||
match Pin::new(&mut self.real_receiver).poll_recv(cx) {
|
||||
// in the case our real message channel stream was closed, we should also indicate we are closed
|
||||
// (and whoever is using the stream should panic)
|
||||
@@ -438,7 +468,10 @@ where
|
||||
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
|
||||
} else {
|
||||
// otherwise construct a dummy one
|
||||
Poll::Ready(Some(StreamMessage::Cover))
|
||||
match need_drop {
|
||||
Ok(_) => Poll::Ready(Some(StreamMessage::Cover(true))),
|
||||
_ => Poll::Ready(Some(StreamMessage::Cover(false))),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,6 +47,7 @@ struct ReceivedMessagesBufferInner<R: MessageReceiver> {
|
||||
recently_reconstructed: HashSet<i32>,
|
||||
|
||||
stats_tx: PacketStatisticsReporter,
|
||||
counter_sender: mpsc::Sender<u8>,
|
||||
}
|
||||
|
||||
impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
|
||||
@@ -71,6 +72,11 @@ impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
|
||||
.report(PacketStatisticsEvent::RealPacketReceived(
|
||||
fragment_data_size,
|
||||
));
|
||||
if self.counter_sender.try_send(1).is_err() {
|
||||
{
|
||||
log::debug!("Failed to send counter message");
|
||||
}
|
||||
};
|
||||
|
||||
let fragment = match self.message_receiver.recover_fragment(fragment_data) {
|
||||
Err(err) => {
|
||||
@@ -164,6 +170,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
|
||||
reply_key_storage: SentReplyKeys,
|
||||
reply_controller_sender: ReplyControllerSender,
|
||||
stats_tx: PacketStatisticsReporter,
|
||||
counter_sender: mpsc::Sender<u8>,
|
||||
) -> Self {
|
||||
ReceivedMessagesBuffer {
|
||||
inner: Arc::new(Mutex::new(ReceivedMessagesBufferInner {
|
||||
@@ -173,6 +180,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
|
||||
message_sender: None,
|
||||
recently_reconstructed: HashSet::new(),
|
||||
stats_tx,
|
||||
counter_sender,
|
||||
})),
|
||||
reply_key_storage,
|
||||
reply_controller_sender,
|
||||
@@ -505,12 +513,14 @@ impl<R: MessageReceiver + Clone + Send + 'static> ReceivedMessagesBufferControll
|
||||
reply_key_storage: SentReplyKeys,
|
||||
reply_controller_sender: ReplyControllerSender,
|
||||
packet_statistics_reporter: PacketStatisticsReporter,
|
||||
counter_sender: mpsc::Sender<u8>,
|
||||
) -> Self {
|
||||
let received_buffer = ReceivedMessagesBuffer::new(
|
||||
local_encryption_keypair,
|
||||
reply_key_storage,
|
||||
reply_controller_sender,
|
||||
packet_statistics_reporter,
|
||||
counter_sender,
|
||||
);
|
||||
|
||||
ReceivedMessagesBufferController {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use nym_crypto::asymmetric::{encryption, identity};
|
||||
use nym_crypto::shared_key::new_ephemeral_shared_key;
|
||||
use nym_crypto::symmetric::stream_cipher;
|
||||
use nym_sphinx_acknowledgements::surb_ack::{SurbAck, SurbAckRecoveryError};
|
||||
@@ -154,6 +155,133 @@ where
|
||||
Ok(MixPacket::new(first_hop_address, packet, packet_type))
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn generate_drop_cover_packet<R>(
|
||||
rng: &mut R,
|
||||
topology: &NymTopology,
|
||||
ack_key: &AckKey,
|
||||
full_address: &Recipient,
|
||||
average_ack_delay: time::Duration,
|
||||
average_packet_delay: time::Duration,
|
||||
packet_size: PacketSize,
|
||||
packet_type: PacketType,
|
||||
) -> Result<MixPacket, CoverMessageError>
|
||||
where
|
||||
R: RngCore + CryptoRng,
|
||||
{
|
||||
// we don't care about total ack delay - we will not be retransmitting it anyway
|
||||
let (_, ack_bytes) = generate_loop_cover_surb_ack(
|
||||
rng,
|
||||
topology,
|
||||
ack_key,
|
||||
full_address,
|
||||
average_ack_delay,
|
||||
packet_type,
|
||||
)?
|
||||
.prepare_for_sending()?;
|
||||
|
||||
// cover message can't be distinguishable from a normal traffic so we have to go through
|
||||
// all the effort of key generation, encryption, etc. Note here we are generating shared key
|
||||
// with ourselves!
|
||||
let (ephemeral_keypair, shared_key) = new_ephemeral_shared_key::<
|
||||
PacketEncryptionAlgorithm,
|
||||
PacketHkdfAlgorithm,
|
||||
_,
|
||||
>(rng, full_address.encryption_key());
|
||||
|
||||
let public_key_bytes = ephemeral_keypair.public_key().to_bytes();
|
||||
let cover_size = packet_size.plaintext_size() - public_key_bytes.len() - ack_bytes.len();
|
||||
|
||||
let mut cover_content: Vec<_> = LOOP_COVER_MESSAGE_PAYLOAD
|
||||
.iter()
|
||||
.cloned()
|
||||
.chain(std::iter::once(1))
|
||||
.chain(std::iter::repeat(0))
|
||||
.take(cover_size)
|
||||
.collect();
|
||||
|
||||
let zero_iv = stream_cipher::zero_iv::<PacketEncryptionAlgorithm>();
|
||||
stream_cipher::encrypt_in_place::<PacketEncryptionAlgorithm>(
|
||||
&shared_key,
|
||||
&zero_iv,
|
||||
&mut cover_content,
|
||||
);
|
||||
|
||||
// combine it together as follows:
|
||||
// SURB_ACK_FIRST_HOP || SURB_ACK_DATA || EPHEMERAL_KEY || COVER_CONTENT
|
||||
// (note: surb_ack_bytes contains SURB_ACK_FIRST_HOP || SURB_ACK_DATA )
|
||||
let packet_payload: Vec<_> = ack_bytes
|
||||
.into_iter()
|
||||
.chain(ephemeral_keypair.public_key().to_bytes().iter().cloned())
|
||||
.chain(cover_content)
|
||||
.collect();
|
||||
|
||||
let route =
|
||||
topology.random_route_to_gateway(rng, DEFAULT_NUM_MIX_HOPS, full_address.gateway())?;
|
||||
let delays = nym_sphinx_routing::generate_hop_delays(average_packet_delay, route.len());
|
||||
|
||||
let client_id_pair = identity::KeyPair::from_bytes(
|
||||
&[
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0,
|
||||
],
|
||||
&[
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0,
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
let client_enc_pair = encryption::KeyPair::from_bytes(
|
||||
&[
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0,
|
||||
],
|
||||
&[
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0,
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let recipient = Recipient::new(
|
||||
*client_id_pair.public_key(),
|
||||
*client_enc_pair.public_key(),
|
||||
*full_address.gateway(),
|
||||
);
|
||||
|
||||
let destination = recipient.as_sphinx_destination();
|
||||
|
||||
let first_hop_address =
|
||||
NymNodeRoutingAddress::try_from(route.first().unwrap().address).unwrap();
|
||||
|
||||
// once merged, that's an easy rng injection point for sphinx packets : )
|
||||
let packet = match packet_type {
|
||||
PacketType::Mix => NymPacket::sphinx_build(
|
||||
packet_size.payload_size(),
|
||||
packet_payload,
|
||||
&route,
|
||||
&destination,
|
||||
&delays,
|
||||
)?,
|
||||
#[allow(deprecated)]
|
||||
PacketType::Vpn => NymPacket::sphinx_build(
|
||||
packet_size.payload_size(),
|
||||
packet_payload,
|
||||
&route,
|
||||
&destination,
|
||||
&delays,
|
||||
)?,
|
||||
PacketType::Outfox => NymPacket::outfox_build(
|
||||
packet_payload,
|
||||
&route,
|
||||
&destination,
|
||||
Some(packet_size.plaintext_size()),
|
||||
)?,
|
||||
};
|
||||
|
||||
Ok(MixPacket::new(first_hop_address, packet, packet_type))
|
||||
}
|
||||
|
||||
/// Helper function used to determine if given message represents a loop cover message.
|
||||
// It kinda seems like there must exist "prefix" or "starts_with" method for bytes
|
||||
// or something, but I couldn't find anything
|
||||
|
||||
@@ -155,7 +155,15 @@ impl<St: Storage> ConnectionHandler<St> {
|
||||
let client_address = processed_final_hop.destination;
|
||||
let message = processed_final_hop.message;
|
||||
let forward_ack = processed_final_hop.forward_ack;
|
||||
|
||||
if client_address
|
||||
== DestinationAddressBytes::from_bytes([
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0,
|
||||
])
|
||||
{
|
||||
trace!("NULL address, dropping the packet");
|
||||
return Ok(());
|
||||
}
|
||||
// we failed to push message directly to the client - it's probably offline.
|
||||
// we should store it on the disk instead.
|
||||
match self.try_push_message_to_client(client_address, message) {
|
||||
|
||||
Reference in New Issue
Block a user