Compare commits

...

2 Commits

Author SHA1 Message Date
Simon Wicky 228380b070 cargo lock 2024-04-03 17:19:12 +02:00
Simon Wicky 8824a875d9 drop cover traffic 2024-04-03 17:18:11 +02:00
7 changed files with 222 additions and 31 deletions
Generated
+9 -9
View File
@@ -2864,7 +2864,7 @@ checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]]
name = "explorer-api"
version = "1.1.32"
version = "1.1.33"
dependencies = [
"chrono",
"clap 4.4.7",
@@ -5764,7 +5764,7 @@ dependencies = [
[[package]]
name = "nym-api"
version = "1.1.34"
version = "1.1.35"
dependencies = [
"actix-web",
"anyhow",
@@ -5922,7 +5922,7 @@ dependencies = [
[[package]]
name = "nym-cli"
version = "1.1.33"
version = "1.1.34"
dependencies = [
"anyhow",
"base64 0.13.1",
@@ -5998,7 +5998,7 @@ dependencies = [
[[package]]
name = "nym-client"
version = "1.1.32"
version = "1.1.33"
dependencies = [
"clap 4.4.7",
"dirs 4.0.0",
@@ -6347,7 +6347,7 @@ dependencies = [
[[package]]
name = "nym-gateway"
version = "1.1.32"
version = "1.1.33"
dependencies = [
"anyhow",
"async-trait",
@@ -6567,7 +6567,7 @@ dependencies = [
[[package]]
name = "nym-mixnode"
version = "1.1.34"
version = "1.1.35"
dependencies = [
"anyhow",
"axum",
@@ -6687,7 +6687,7 @@ dependencies = [
[[package]]
name = "nym-network-requester"
version = "1.1.32"
version = "1.1.33"
dependencies = [
"addr",
"anyhow",
@@ -6736,7 +6736,7 @@ dependencies = [
[[package]]
name = "nym-network-statistics"
version = "1.1.32"
version = "1.1.33"
dependencies = [
"dirs 4.0.0",
"log",
@@ -6981,7 +6981,7 @@ dependencies = [
[[package]]
name = "nym-socks5-client"
version = "1.1.32"
version = "1.1.33"
dependencies = [
"clap 4.4.7",
"lazy_static",
@@ -290,6 +290,7 @@ where
shutdown: TaskClient,
packet_type: PacketType,
stats_tx: PacketStatisticsReporter,
counter_receiver: mpsc::Receiver<u8>,
) {
info!("Starting real traffic stream...");
@@ -305,12 +306,14 @@ where
lane_queue_lengths,
client_connection_rx,
stats_tx,
counter_receiver,
)
.start_with_shutdown(shutdown, packet_type);
}
// buffer controlling all messages fetched from provider
// required so that other components would be able to use them (say the websocket)
#[allow(clippy::too_many_arguments)]
fn start_received_messages_buffer_controller(
local_encryption_keypair: Arc<encryption::KeyPair>,
query_receiver: ReceivedBufferRequestReceiver,
@@ -319,6 +322,7 @@ where
reply_controller_sender: ReplyControllerSender,
shutdown: TaskClient,
packet_statistics_control: PacketStatisticsReporter,
counter_sender: mpsc::Sender<u8>,
) {
info!("Starting received messages buffer controller...");
let controller: ReceivedMessagesBufferController<SphinxMessageReceiver> =
@@ -329,6 +333,7 @@ where
reply_key_storage,
reply_controller_sender,
packet_statistics_control,
counter_sender,
);
controller.start_with_shutdown(shutdown)
}
@@ -606,6 +611,9 @@ where
// channels responsible for controlling real messages
let (input_sender, input_receiver) = tokio::sync::mpsc::channel::<InputMessage>(1);
// drop cover traffic counter channel
let (counter_sender, counter_receiver) = mpsc::channel::<u8>(10000);
// channels responsible for controlling ack messages
let (ack_sender, ack_receiver) = mpsc::unbounded();
let shared_topology_accessor = TopologyAccessor::new();
@@ -681,6 +689,7 @@ where
reply_controller_sender.clone(),
shutdown.fork("received_messages_buffer"),
packet_stats_reporter.clone(),
counter_sender,
);
// The message_sender is the transmitter for any component generating sphinx packets
@@ -720,6 +729,7 @@ where
shutdown.fork("real_traffic_controller"),
self.config.debug.traffic.packet_type,
packet_stats_reporter.clone(),
counter_receiver,
);
if !self
@@ -146,6 +146,7 @@ impl RealMessagesController<OsRng> {
lane_queue_lengths: LaneQueueLengths,
client_connection_rx: ConnectionCommandReceiver,
stats_tx: PacketStatisticsReporter,
counter_receiver: mpsc::Receiver<u8>,
) -> Self {
let rng = OsRng;
@@ -204,6 +205,7 @@ impl RealMessagesController<OsRng> {
lane_queue_lengths,
client_connection_rx,
stats_tx,
counter_receiver,
);
RealMessagesController {
@@ -8,13 +8,14 @@ use crate::client::real_messages_control::acknowledgement_control::SentPacketNot
use crate::client::topology_control::TopologyAccessor;
use crate::client::transmission_buffer::TransmissionBuffer;
use crate::config;
use futures::channel::mpsc;
use futures::task::{Context, Poll};
use futures::{Future, Stream, StreamExt};
use log::*;
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::chunking::fragment::FragmentIdentifier;
use nym_sphinx::cover::generate_loop_cover_packet;
use nym_sphinx::cover::{generate_drop_cover_packet, generate_loop_cover_packet};
use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::params::PacketSize;
use nym_sphinx::preparer::PreparedFragment;
@@ -117,6 +118,8 @@ where
/// Channel used for sending statistics events to `PacketStatisticsControl`.
stats_tx: PacketStatisticsReporter,
//counter for drop cover traffic
counter_receiver: mpsc::Receiver<u8>,
}
#[derive(Debug)]
@@ -155,7 +158,7 @@ pub(crate) type BatchRealMessageSender =
type BatchRealMessageReceiver = tokio::sync::mpsc::Receiver<(Vec<RealMessage>, TransmissionLane)>;
pub(crate) enum StreamMessage {
Cover,
Cover(bool),
Real(Box<RealMessage>),
}
@@ -176,6 +179,7 @@ where
lane_queue_lengths: LaneQueueLengths,
client_connection_rx: ConnectionCommandReceiver,
stats_tx: PacketStatisticsReporter,
counter_receiver: mpsc::Receiver<u8>,
) -> Self {
OutQueueControl {
config,
@@ -190,6 +194,7 @@ where
client_connection_rx,
lane_queue_lengths,
stats_tx,
counter_receiver,
}
}
@@ -221,7 +226,7 @@ where
trace!("created new message");
let (next_message, fragment_id, packet_size) = match next_message {
StreamMessage::Cover => {
StreamMessage::Cover(drop) => {
let cover_traffic_packet_size = self.loop_cover_message_size();
trace!("the next loop cover message will be put in a {cover_traffic_packet_size} packet");
@@ -240,24 +245,44 @@ where
return;
}
};
(
generate_loop_cover_packet(
&mut self.rng,
topology_ref,
&self.config.ack_key,
&self.config.our_full_destination,
self.config.average_ack_delay,
self.config.traffic.average_packet_delay,
cover_traffic_packet_size,
self.config.traffic.packet_type,
if drop {
debug!("Sending a drop cover message");
(
generate_drop_cover_packet(
&mut self.rng,
topology_ref,
&self.config.ack_key,
&self.config.our_full_destination,
self.config.average_ack_delay,
self.config.traffic.average_packet_delay,
cover_traffic_packet_size,
self.config.traffic.packet_type,
)
.expect(
"Somehow failed to generate a drop cover message with a valid topology",
),
None,
cover_traffic_packet_size.size(),
)
.expect(
"Somehow failed to generate a loop cover message with a valid topology",
),
None,
cover_traffic_packet_size.size(),
)
} else {
(
generate_loop_cover_packet(
&mut self.rng,
topology_ref,
&self.config.ack_key,
&self.config.our_full_destination,
self.config.average_ack_delay,
self.config.traffic.average_packet_delay,
cover_traffic_packet_size,
self.config.traffic.packet_type,
)
.expect(
"Somehow failed to generate a loop cover message with a valid topology",
),
None,
cover_traffic_packet_size.size(),
)
}
}
StreamMessage::Real(real_message) => {
let packet_size = real_message.packet_size();
@@ -385,6 +410,10 @@ where
}
fn poll_poisson(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
//For instantaneous drop cover traffic
// if let Ok(_) = self.counter_receiver.try_next() {
// return Poll::Ready(Some(StreamMessage::Cover(true)));
// };
// The average delay could change depending on if backpressure in the downstream channel
// (mix_tx) was detected.
self.adjust_current_average_message_sending_delay();
@@ -419,6 +448,7 @@ where
// in `Vec`, this ensures that on average we will fetch messages faster than we can
// send, which is a condition for being able to multiplex packets from multiple
// data streams.
let need_drop = self.counter_receiver.try_next();
match Pin::new(&mut self.real_receiver).poll_recv(cx) {
// in the case our real message channel stream was closed, we should also indicate we are closed
// (and whoever is using the stream should panic)
@@ -438,7 +468,10 @@ where
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
} else {
// otherwise construct a dummy one
Poll::Ready(Some(StreamMessage::Cover))
match need_drop {
Ok(_) => Poll::Ready(Some(StreamMessage::Cover(true))),
_ => Poll::Ready(Some(StreamMessage::Cover(false))),
}
}
}
}
@@ -47,6 +47,7 @@ struct ReceivedMessagesBufferInner<R: MessageReceiver> {
recently_reconstructed: HashSet<i32>,
stats_tx: PacketStatisticsReporter,
counter_sender: mpsc::Sender<u8>,
}
impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
@@ -71,6 +72,11 @@ impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
.report(PacketStatisticsEvent::RealPacketReceived(
fragment_data_size,
));
if self.counter_sender.try_send(1).is_err() {
{
log::debug!("Failed to send counter message");
}
};
let fragment = match self.message_receiver.recover_fragment(fragment_data) {
Err(err) => {
@@ -164,6 +170,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
reply_key_storage: SentReplyKeys,
reply_controller_sender: ReplyControllerSender,
stats_tx: PacketStatisticsReporter,
counter_sender: mpsc::Sender<u8>,
) -> Self {
ReceivedMessagesBuffer {
inner: Arc::new(Mutex::new(ReceivedMessagesBufferInner {
@@ -173,6 +180,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
message_sender: None,
recently_reconstructed: HashSet::new(),
stats_tx,
counter_sender,
})),
reply_key_storage,
reply_controller_sender,
@@ -505,12 +513,14 @@ impl<R: MessageReceiver + Clone + Send + 'static> ReceivedMessagesBufferControll
reply_key_storage: SentReplyKeys,
reply_controller_sender: ReplyControllerSender,
packet_statistics_reporter: PacketStatisticsReporter,
counter_sender: mpsc::Sender<u8>,
) -> Self {
let received_buffer = ReceivedMessagesBuffer::new(
local_encryption_keypair,
reply_key_storage,
reply_controller_sender,
packet_statistics_reporter,
counter_sender,
);
ReceivedMessagesBufferController {
+128
View File
@@ -1,6 +1,7 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_crypto::asymmetric::{encryption, identity};
use nym_crypto::shared_key::new_ephemeral_shared_key;
use nym_crypto::symmetric::stream_cipher;
use nym_sphinx_acknowledgements::surb_ack::{SurbAck, SurbAckRecoveryError};
@@ -154,6 +155,133 @@ where
Ok(MixPacket::new(first_hop_address, packet, packet_type))
}
#[allow(clippy::too_many_arguments)]
pub fn generate_drop_cover_packet<R>(
rng: &mut R,
topology: &NymTopology,
ack_key: &AckKey,
full_address: &Recipient,
average_ack_delay: time::Duration,
average_packet_delay: time::Duration,
packet_size: PacketSize,
packet_type: PacketType,
) -> Result<MixPacket, CoverMessageError>
where
R: RngCore + CryptoRng,
{
// we don't care about total ack delay - we will not be retransmitting it anyway
let (_, ack_bytes) = generate_loop_cover_surb_ack(
rng,
topology,
ack_key,
full_address,
average_ack_delay,
packet_type,
)?
.prepare_for_sending()?;
// cover message can't be distinguishable from a normal traffic so we have to go through
// all the effort of key generation, encryption, etc. Note here we are generating shared key
// with ourselves!
let (ephemeral_keypair, shared_key) = new_ephemeral_shared_key::<
PacketEncryptionAlgorithm,
PacketHkdfAlgorithm,
_,
>(rng, full_address.encryption_key());
let public_key_bytes = ephemeral_keypair.public_key().to_bytes();
let cover_size = packet_size.plaintext_size() - public_key_bytes.len() - ack_bytes.len();
let mut cover_content: Vec<_> = LOOP_COVER_MESSAGE_PAYLOAD
.iter()
.cloned()
.chain(std::iter::once(1))
.chain(std::iter::repeat(0))
.take(cover_size)
.collect();
let zero_iv = stream_cipher::zero_iv::<PacketEncryptionAlgorithm>();
stream_cipher::encrypt_in_place::<PacketEncryptionAlgorithm>(
&shared_key,
&zero_iv,
&mut cover_content,
);
// combine it together as follows:
// SURB_ACK_FIRST_HOP || SURB_ACK_DATA || EPHEMERAL_KEY || COVER_CONTENT
// (note: surb_ack_bytes contains SURB_ACK_FIRST_HOP || SURB_ACK_DATA )
let packet_payload: Vec<_> = ack_bytes
.into_iter()
.chain(ephemeral_keypair.public_key().to_bytes().iter().cloned())
.chain(cover_content)
.collect();
let route =
topology.random_route_to_gateway(rng, DEFAULT_NUM_MIX_HOPS, full_address.gateway())?;
let delays = nym_sphinx_routing::generate_hop_delays(average_packet_delay, route.len());
let client_id_pair = identity::KeyPair::from_bytes(
&[
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0,
],
&[
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0,
],
)
.unwrap();
let client_enc_pair = encryption::KeyPair::from_bytes(
&[
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0,
],
&[
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0,
],
)
.unwrap();
let recipient = Recipient::new(
*client_id_pair.public_key(),
*client_enc_pair.public_key(),
*full_address.gateway(),
);
let destination = recipient.as_sphinx_destination();
let first_hop_address =
NymNodeRoutingAddress::try_from(route.first().unwrap().address).unwrap();
// once merged, that's an easy rng injection point for sphinx packets : )
let packet = match packet_type {
PacketType::Mix => NymPacket::sphinx_build(
packet_size.payload_size(),
packet_payload,
&route,
&destination,
&delays,
)?,
#[allow(deprecated)]
PacketType::Vpn => NymPacket::sphinx_build(
packet_size.payload_size(),
packet_payload,
&route,
&destination,
&delays,
)?,
PacketType::Outfox => NymPacket::outfox_build(
packet_payload,
&route,
&destination,
Some(packet_size.plaintext_size()),
)?,
};
Ok(MixPacket::new(first_hop_address, packet, packet_type))
}
/// Helper function used to determine if given message represents a loop cover message.
// It kinda seems like there must exist "prefix" or "starts_with" method for bytes
// or something, but I couldn't find anything
@@ -155,7 +155,15 @@ impl<St: Storage> ConnectionHandler<St> {
let client_address = processed_final_hop.destination;
let message = processed_final_hop.message;
let forward_ack = processed_final_hop.forward_ack;
if client_address
== DestinationAddressBytes::from_bytes([
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0,
])
{
trace!("NULL address, dropping the packet");
return Ok(());
}
// we failed to push message directly to the client - it's probably offline.
// we should store it on the disk instead.
match self.try_push_message_to_client(client_address, message) {