Compare commits

...

1 Commits

Author SHA1 Message Date
Jon Häggblad b7fc1f1497 wip 2023-12-05 22:24:35 +01:00
8 changed files with 39 additions and 28 deletions
@@ -12,7 +12,7 @@ pub type BatchMixMessageReceiver = tokio::sync::mpsc::Receiver<Vec<MixPacket>>;
pub mod transceiver;
// We remind ourselves that 32 x 32kb = 1024kb, a reasonable size for a network buffer.
pub const MIX_MESSAGE_RECEIVER_BUFFER_SIZE: usize = 32;
pub const MIX_MESSAGE_RECEIVER_BUFFER_SIZE: usize = 8;
const MAX_FAILURE_COUNT: usize = 100;
// that's also disgusting.
@@ -158,7 +158,7 @@ pub struct LocalGateway {
// 'sender' part
/// Channel responsible for taking mix packets and forwarding them further into the further mixnet layers.
packet_forwarder: mpsc::UnboundedSender<MixPacket>,
packet_forwarder: mpsc::Sender<MixPacket>,
// 'receiver' part
packet_router_tx: Option<oneshot::Sender<PacketRouter>>,
@@ -168,7 +168,7 @@ pub struct LocalGateway {
impl LocalGateway {
pub fn new(
local_identity: identity::PublicKey,
packet_forwarder: mpsc::UnboundedSender<MixPacket>,
packet_forwarder: mpsc::Sender<MixPacket>,
packet_router_tx: oneshot::Sender<PacketRouter>,
) -> Self {
LocalGateway {
@@ -181,6 +181,8 @@ impl LocalGateway {
#[cfg(not(target_arch = "wasm32"))]
mod nonwasm_sealed {
use futures::SinkExt;
use super::*;
impl GatewayTransceiver for LocalGateway {
@@ -193,9 +195,12 @@ mod nonwasm_sealed {
impl GatewaySender for LocalGateway {
async fn send_mix_packet(&mut self, packet: MixPacket) -> Result<(), ErasedGatewayError> {
self.packet_forwarder
.unbounded_send(packet)
.map_err(|err| err.into_send_error())
.map_err(erase_err)
.send(packet)
.await
.unwrap();
Ok(())
// .map_err(|err| err.into_send_error())
// .map_err(erase_err)
}
}
@@ -482,7 +482,7 @@ where
} else if packets > 0 {
log::info!("{status_str}");
} else {
log::debug!("{status_str}");
log::info!("{status_str}");
}
// Send status message to whoever is listening (possibly UI)
@@ -498,7 +498,7 @@ where
#[cfg(not(target_arch = "wasm32"))]
{
let mut status_timer = tokio::time::interval(Duration::from_secs(5));
let mut status_timer = tokio::time::interval(Duration::from_secs(1));
loop {
tokio::select! {
@@ -8,8 +8,8 @@ use log::*;
use nym_sphinx::forwarding::packet::MixPacket;
use std::time::Duration;
pub type MixForwardingSender = mpsc::UnboundedSender<MixPacket>;
type MixForwardingReceiver = mpsc::UnboundedReceiver<MixPacket>;
pub type MixForwardingSender = mpsc::Sender<MixPacket>;
type MixForwardingReceiver = mpsc::Receiver<MixPacket>;
/// A specialisation of client such that it forwards any received packets on the channel into the
/// mix network immediately, i.e. will not try to listen for any responses.
@@ -36,7 +36,8 @@ impl PacketForwarder {
use_legacy_version,
);
let (packet_sender, packet_receiver) = mpsc::unbounded();
// let (packet_sender, packet_receiver) = mpsc::unbounded();
let (packet_sender, packet_receiver) = mpsc::channel(8);
(
PacketForwarder {
@@ -3,7 +3,7 @@
use futures::{
future::{FusedFuture, OptionFuture},
FutureExt, StreamExt,
FutureExt, StreamExt, SinkExt,
};
use log::*;
use nym_gateway_requests::{
@@ -203,8 +203,9 @@ where
/// # Arguments
///
/// * `mix_packet`: packet received from the client that should get forwarded into the network.
fn forward_packet(&self, mix_packet: MixPacket) {
if let Err(err) = self.inner.outbound_mix_sender.unbounded_send(mix_packet) {
async fn forward_packet(&mut self, mix_packet: MixPacket) {
// if let Err(err) = self.inner.outbound_mix_sender.unbounded_send(mix_packet) {
if let Err(err) = self.inner.outbound_mix_sender.send(mix_packet).await {
error!("We failed to forward requested mix packet - {err}. Presumably our mix forwarder has crashed. We cannot continue.");
process::exit(1);
}
@@ -303,7 +304,7 @@ where
///
/// * `mix_packet`: packet received from the client that should get forwarded into the network.
async fn handle_forward_sphinx(
&self,
&mut self,
mix_packet: MixPacket,
) -> Result<ServerResponse, RequestHandlingError> {
let consumed_bandwidth = mix_packet.packet().len() as i64;
@@ -317,7 +318,7 @@ where
}
self.consume_bandwidth(consumed_bandwidth).await?;
self.forward_packet(mix_packet);
self.forward_packet(mix_packet).await;
Ok(ServerResponse::Send {
remaining_bandwidth: available_bandwidth - consumed_bandwidth,
@@ -329,7 +330,7 @@ where
/// # Arguments
///
/// * `bin_msg`: raw message to handle.
async fn handle_binary(&self, bin_msg: Vec<u8>) -> Message {
async fn handle_binary(&mut self, bin_msg: Vec<u8>) -> Message {
// this function decrypts the request and checks the MAC
match BinaryRequest::try_from_encrypted_tagged_bytes(bin_msg, &self.client.shared_keys) {
Err(e) => {
@@ -7,7 +7,7 @@ use crate::node::mixnet_handling::receiver::packet_processing::PacketProcessor;
use crate::node::storage::error::StorageError;
use crate::node::storage::Storage;
use futures::channel::mpsc::SendError;
use futures::StreamExt;
use futures::{StreamExt, SinkExt};
use log::*;
use nym_mixnet_client::forwarder::MixForwardingSender;
use nym_mixnode_common::packet_processor::processor::ProcessedFinalHop;
@@ -128,8 +128,8 @@ impl<St: Storage> ConnectionHandler<St> {
self.storage.store_message(client_address, message).await
}
fn forward_ack(
&self,
async fn forward_ack(
&mut self,
forward_ack: Option<MixPacket>,
client_address: DestinationAddressBytes,
) -> Result<(), CriticalPacketProcessingError> {
@@ -138,12 +138,14 @@ impl<St: Storage> ConnectionHandler<St> {
trace!("Sending ack from packet for {client_address} to {next_hop}",);
self.ack_sender
.unbounded_send(forward_ack)
.map_err(
|source| CriticalPacketProcessingError::AckForwardingFailure {
source: source.into_send_error(),
},
)?;
// .unbounded_send(forward_ack)
.send(forward_ack)
.await.unwrap();
// .map_err(
// |source| CriticalPacketProcessingError::AckForwardingFailure {
// source: source.into_send_error(),
// },
// )?;
}
Ok(())
}
@@ -172,7 +174,7 @@ impl<St: Storage> ConnectionHandler<St> {
// if we managed to either push message directly to the [online] client or store it at
// its inbox, it means that it must exist at this gateway, hence we can send the
// received ack back into the network
self.forward_ack(forward_ack, client_address)
self.forward_ack(forward_ack, client_address).await
}
async fn handle_received_packet(
+2
View File
@@ -271,6 +271,7 @@ impl<St> Gateway<St> {
// this gateway, whenever it has anything to send to its local NR will use fake_client_tx
let (nr_mix_sender, nr_mix_receiver) = mpsc::unbounded();
// let (nr_mix_sender, nr_mix_receiver) = mpsc::channel(8);
let router_shutdown = shutdown.fork("message_router");
let (router_tx, mut router_rx) = oneshot::channel();
@@ -335,6 +336,7 @@ impl<St> Gateway<St> {
// this gateway, whenever it has anything to send to its local NR will use fake_client_tx
let (nr_mix_sender, nr_mix_receiver) = mpsc::unbounded();
// let (nr_mix_sender, nr_mix_receiver) = mpsc::channel(8);
let router_shutdown = shutdown.fork("message_router");
let (router_tx, mut router_rx) = oneshot::channel();
@@ -324,7 +324,7 @@ impl MixnetListener {
if !self.request_filter.check_address(&dst).await {
log::warn!("Failed filter check: {dst}");
// TODO: we could consider sending back a response here
return Err(IpPacketRouterError::AddressFailedFilterCheck { addr: dst });
return Ok(None);
}
} else {
// TODO: we should also filter packets without port number