Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| b7fc1f1497 |
@@ -12,7 +12,7 @@ pub type BatchMixMessageReceiver = tokio::sync::mpsc::Receiver<Vec<MixPacket>>;
|
||||
pub mod transceiver;
|
||||
|
||||
// We remind ourselves that 32 x 32kb = 1024kb, a reasonable size for a network buffer.
|
||||
pub const MIX_MESSAGE_RECEIVER_BUFFER_SIZE: usize = 32;
|
||||
pub const MIX_MESSAGE_RECEIVER_BUFFER_SIZE: usize = 8;
|
||||
const MAX_FAILURE_COUNT: usize = 100;
|
||||
|
||||
// that's also disgusting.
|
||||
|
||||
@@ -158,7 +158,7 @@ pub struct LocalGateway {
|
||||
|
||||
// 'sender' part
|
||||
/// Channel responsible for taking mix packets and forwarding them further into the further mixnet layers.
|
||||
packet_forwarder: mpsc::UnboundedSender<MixPacket>,
|
||||
packet_forwarder: mpsc::Sender<MixPacket>,
|
||||
|
||||
// 'receiver' part
|
||||
packet_router_tx: Option<oneshot::Sender<PacketRouter>>,
|
||||
@@ -168,7 +168,7 @@ pub struct LocalGateway {
|
||||
impl LocalGateway {
|
||||
pub fn new(
|
||||
local_identity: identity::PublicKey,
|
||||
packet_forwarder: mpsc::UnboundedSender<MixPacket>,
|
||||
packet_forwarder: mpsc::Sender<MixPacket>,
|
||||
packet_router_tx: oneshot::Sender<PacketRouter>,
|
||||
) -> Self {
|
||||
LocalGateway {
|
||||
@@ -181,6 +181,8 @@ impl LocalGateway {
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
mod nonwasm_sealed {
|
||||
use futures::SinkExt;
|
||||
|
||||
use super::*;
|
||||
|
||||
impl GatewayTransceiver for LocalGateway {
|
||||
@@ -193,9 +195,12 @@ mod nonwasm_sealed {
|
||||
impl GatewaySender for LocalGateway {
|
||||
async fn send_mix_packet(&mut self, packet: MixPacket) -> Result<(), ErasedGatewayError> {
|
||||
self.packet_forwarder
|
||||
.unbounded_send(packet)
|
||||
.map_err(|err| err.into_send_error())
|
||||
.map_err(erase_err)
|
||||
.send(packet)
|
||||
.await
|
||||
.unwrap();
|
||||
Ok(())
|
||||
// .map_err(|err| err.into_send_error())
|
||||
// .map_err(erase_err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -482,7 +482,7 @@ where
|
||||
} else if packets > 0 {
|
||||
log::info!("{status_str}");
|
||||
} else {
|
||||
log::debug!("{status_str}");
|
||||
log::info!("{status_str}");
|
||||
}
|
||||
|
||||
// Send status message to whoever is listening (possibly UI)
|
||||
@@ -498,7 +498,7 @@ where
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
{
|
||||
let mut status_timer = tokio::time::interval(Duration::from_secs(5));
|
||||
let mut status_timer = tokio::time::interval(Duration::from_secs(1));
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
|
||||
@@ -8,8 +8,8 @@ use log::*;
|
||||
use nym_sphinx::forwarding::packet::MixPacket;
|
||||
use std::time::Duration;
|
||||
|
||||
pub type MixForwardingSender = mpsc::UnboundedSender<MixPacket>;
|
||||
type MixForwardingReceiver = mpsc::UnboundedReceiver<MixPacket>;
|
||||
pub type MixForwardingSender = mpsc::Sender<MixPacket>;
|
||||
type MixForwardingReceiver = mpsc::Receiver<MixPacket>;
|
||||
|
||||
/// A specialisation of client such that it forwards any received packets on the channel into the
|
||||
/// mix network immediately, i.e. will not try to listen for any responses.
|
||||
@@ -36,7 +36,8 @@ impl PacketForwarder {
|
||||
use_legacy_version,
|
||||
);
|
||||
|
||||
let (packet_sender, packet_receiver) = mpsc::unbounded();
|
||||
// let (packet_sender, packet_receiver) = mpsc::unbounded();
|
||||
let (packet_sender, packet_receiver) = mpsc::channel(8);
|
||||
|
||||
(
|
||||
PacketForwarder {
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
use futures::{
|
||||
future::{FusedFuture, OptionFuture},
|
||||
FutureExt, StreamExt,
|
||||
FutureExt, StreamExt, SinkExt,
|
||||
};
|
||||
use log::*;
|
||||
use nym_gateway_requests::{
|
||||
@@ -203,8 +203,9 @@ where
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `mix_packet`: packet received from the client that should get forwarded into the network.
|
||||
fn forward_packet(&self, mix_packet: MixPacket) {
|
||||
if let Err(err) = self.inner.outbound_mix_sender.unbounded_send(mix_packet) {
|
||||
async fn forward_packet(&mut self, mix_packet: MixPacket) {
|
||||
// if let Err(err) = self.inner.outbound_mix_sender.unbounded_send(mix_packet) {
|
||||
if let Err(err) = self.inner.outbound_mix_sender.send(mix_packet).await {
|
||||
error!("We failed to forward requested mix packet - {err}. Presumably our mix forwarder has crashed. We cannot continue.");
|
||||
process::exit(1);
|
||||
}
|
||||
@@ -303,7 +304,7 @@ where
|
||||
///
|
||||
/// * `mix_packet`: packet received from the client that should get forwarded into the network.
|
||||
async fn handle_forward_sphinx(
|
||||
&self,
|
||||
&mut self,
|
||||
mix_packet: MixPacket,
|
||||
) -> Result<ServerResponse, RequestHandlingError> {
|
||||
let consumed_bandwidth = mix_packet.packet().len() as i64;
|
||||
@@ -317,7 +318,7 @@ where
|
||||
}
|
||||
|
||||
self.consume_bandwidth(consumed_bandwidth).await?;
|
||||
self.forward_packet(mix_packet);
|
||||
self.forward_packet(mix_packet).await;
|
||||
|
||||
Ok(ServerResponse::Send {
|
||||
remaining_bandwidth: available_bandwidth - consumed_bandwidth,
|
||||
@@ -329,7 +330,7 @@ where
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `bin_msg`: raw message to handle.
|
||||
async fn handle_binary(&self, bin_msg: Vec<u8>) -> Message {
|
||||
async fn handle_binary(&mut self, bin_msg: Vec<u8>) -> Message {
|
||||
// this function decrypts the request and checks the MAC
|
||||
match BinaryRequest::try_from_encrypted_tagged_bytes(bin_msg, &self.client.shared_keys) {
|
||||
Err(e) => {
|
||||
|
||||
@@ -7,7 +7,7 @@ use crate::node::mixnet_handling::receiver::packet_processing::PacketProcessor;
|
||||
use crate::node::storage::error::StorageError;
|
||||
use crate::node::storage::Storage;
|
||||
use futures::channel::mpsc::SendError;
|
||||
use futures::StreamExt;
|
||||
use futures::{StreamExt, SinkExt};
|
||||
use log::*;
|
||||
use nym_mixnet_client::forwarder::MixForwardingSender;
|
||||
use nym_mixnode_common::packet_processor::processor::ProcessedFinalHop;
|
||||
@@ -128,8 +128,8 @@ impl<St: Storage> ConnectionHandler<St> {
|
||||
self.storage.store_message(client_address, message).await
|
||||
}
|
||||
|
||||
fn forward_ack(
|
||||
&self,
|
||||
async fn forward_ack(
|
||||
&mut self,
|
||||
forward_ack: Option<MixPacket>,
|
||||
client_address: DestinationAddressBytes,
|
||||
) -> Result<(), CriticalPacketProcessingError> {
|
||||
@@ -138,12 +138,14 @@ impl<St: Storage> ConnectionHandler<St> {
|
||||
trace!("Sending ack from packet for {client_address} to {next_hop}",);
|
||||
|
||||
self.ack_sender
|
||||
.unbounded_send(forward_ack)
|
||||
.map_err(
|
||||
|source| CriticalPacketProcessingError::AckForwardingFailure {
|
||||
source: source.into_send_error(),
|
||||
},
|
||||
)?;
|
||||
// .unbounded_send(forward_ack)
|
||||
.send(forward_ack)
|
||||
.await.unwrap();
|
||||
// .map_err(
|
||||
// |source| CriticalPacketProcessingError::AckForwardingFailure {
|
||||
// source: source.into_send_error(),
|
||||
// },
|
||||
// )?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -172,7 +174,7 @@ impl<St: Storage> ConnectionHandler<St> {
|
||||
// if we managed to either push message directly to the [online] client or store it at
|
||||
// its inbox, it means that it must exist at this gateway, hence we can send the
|
||||
// received ack back into the network
|
||||
self.forward_ack(forward_ack, client_address)
|
||||
self.forward_ack(forward_ack, client_address).await
|
||||
}
|
||||
|
||||
async fn handle_received_packet(
|
||||
|
||||
@@ -271,6 +271,7 @@ impl<St> Gateway<St> {
|
||||
|
||||
// this gateway, whenever it has anything to send to its local NR will use fake_client_tx
|
||||
let (nr_mix_sender, nr_mix_receiver) = mpsc::unbounded();
|
||||
// let (nr_mix_sender, nr_mix_receiver) = mpsc::channel(8);
|
||||
let router_shutdown = shutdown.fork("message_router");
|
||||
|
||||
let (router_tx, mut router_rx) = oneshot::channel();
|
||||
@@ -335,6 +336,7 @@ impl<St> Gateway<St> {
|
||||
|
||||
// this gateway, whenever it has anything to send to its local NR will use fake_client_tx
|
||||
let (nr_mix_sender, nr_mix_receiver) = mpsc::unbounded();
|
||||
// let (nr_mix_sender, nr_mix_receiver) = mpsc::channel(8);
|
||||
let router_shutdown = shutdown.fork("message_router");
|
||||
|
||||
let (router_tx, mut router_rx) = oneshot::channel();
|
||||
|
||||
@@ -324,7 +324,7 @@ impl MixnetListener {
|
||||
if !self.request_filter.check_address(&dst).await {
|
||||
log::warn!("Failed filter check: {dst}");
|
||||
// TODO: we could consider sending back a response here
|
||||
return Err(IpPacketRouterError::AddressFailedFilterCheck { addr: dst });
|
||||
return Ok(None);
|
||||
}
|
||||
} else {
|
||||
// TODO: we should also filter packets without port number
|
||||
|
||||
Reference in New Issue
Block a user