Compare commits
22 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| edfb75b9ef | |||
| 298591b9e0 | |||
| 009b2131f6 | |||
| e5af0f5d5e | |||
| 3cad2dbb34 | |||
| e95e33cd70 | |||
| 5281895d5b | |||
| 263db0dbc3 | |||
| 6d44fe818e | |||
| b8ec48cf07 | |||
| 99227e837c | |||
| 34cb142595 | |||
| 2988ae4459 | |||
| e653b632ba | |||
| eb216a06b3 | |||
| 921f01a789 | |||
| 27bf8b2e00 | |||
| e82a669cd3 | |||
| 2015443a90 | |||
| 2c2823f9e1 | |||
| 52b41a5697 | |||
| 9ee6ae44e2 |
Generated
+731
-473
File diff suppressed because it is too large
Load Diff
@@ -333,7 +333,7 @@ where
|
|||||||
fn poll_poisson(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
|
fn poll_poisson(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
|
||||||
// The average delay could change depending on if backpressure in the downstream channel
|
// The average delay could change depending on if backpressure in the downstream channel
|
||||||
// (mix_tx) was detected.
|
// (mix_tx) was detected.
|
||||||
self.adjust_current_average_message_sending_delay();
|
//self.adjust_current_average_message_sending_delay();
|
||||||
let avg_delay = self.current_average_message_sending_delay();
|
let avg_delay = self.current_average_message_sending_delay();
|
||||||
|
|
||||||
// Start by checking if we have any incoming messages about closed connections
|
// Start by checking if we have any incoming messages about closed connections
|
||||||
|
|||||||
@@ -697,6 +697,17 @@ pub enum ExtendedPacketSize {
|
|||||||
Extended8,
|
Extended8,
|
||||||
Extended16,
|
Extended16,
|
||||||
Extended32,
|
Extended32,
|
||||||
|
Extended10,
|
||||||
|
Extended15,
|
||||||
|
Extended20,
|
||||||
|
Extended25,
|
||||||
|
Extended50,
|
||||||
|
Extended100,
|
||||||
|
Extended150,
|
||||||
|
Extended200,
|
||||||
|
Extended250,
|
||||||
|
Extended500,
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for DebugConfig {
|
impl Default for DebugConfig {
|
||||||
@@ -734,6 +745,17 @@ impl From<ExtendedPacketSize> for PacketSize {
|
|||||||
ExtendedPacketSize::Extended8 => PacketSize::ExtendedPacket8,
|
ExtendedPacketSize::Extended8 => PacketSize::ExtendedPacket8,
|
||||||
ExtendedPacketSize::Extended16 => PacketSize::ExtendedPacket16,
|
ExtendedPacketSize::Extended16 => PacketSize::ExtendedPacket16,
|
||||||
ExtendedPacketSize::Extended32 => PacketSize::ExtendedPacket32,
|
ExtendedPacketSize::Extended32 => PacketSize::ExtendedPacket32,
|
||||||
|
ExtendedPacketSize::Extended10 => PacketSize::ExtendedPacket10,
|
||||||
|
ExtendedPacketSize::Extended15 => PacketSize::ExtendedPacket15,
|
||||||
|
ExtendedPacketSize::Extended20 => PacketSize::ExtendedPacket20,
|
||||||
|
ExtendedPacketSize::Extended25 => PacketSize::ExtendedPacket25,
|
||||||
|
ExtendedPacketSize::Extended50 => PacketSize::ExtendedPacket50,
|
||||||
|
ExtendedPacketSize::Extended100 => PacketSize::ExtendedPacket100,
|
||||||
|
ExtendedPacketSize::Extended150 => PacketSize::ExtendedPacket150,
|
||||||
|
ExtendedPacketSize::Extended200 => PacketSize::ExtendedPacket200,
|
||||||
|
ExtendedPacketSize::Extended250 => PacketSize::ExtendedPacket250,
|
||||||
|
ExtendedPacketSize::Extended500 => PacketSize::ExtendedPacket500,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -74,7 +74,7 @@ impl PacketRouter {
|
|||||||
received_messages.push(received_packet);
|
received_messages.push(received_packet);
|
||||||
} else {
|
} else {
|
||||||
// this can happen if other clients are not padding their messages
|
// this can happen if other clients are not padding their messages
|
||||||
warn!("Received message of unexpected size. Probably from an outdated client... len: {}", received_packet.len());
|
//warn!("Received message of unexpected size. Probably from an outdated client... len: {}", received_packet.len());
|
||||||
received_messages.push(received_packet);
|
received_messages.push(received_packet);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ edition = "2021"
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
|
tracing = "0.1.37"
|
||||||
log = { workspace = true }
|
log = { workspace = true }
|
||||||
tokio = { version = "1.24.1", features = ["time", "net", "rt"] }
|
tokio = { version = "1.24.1", features = ["time", "net", "rt"] }
|
||||||
tokio-util = { version = "0.7.4", features = ["codec"] }
|
tokio-util = { version = "0.7.4", features = ["codec"] }
|
||||||
|
|||||||
@@ -3,11 +3,12 @@
|
|||||||
|
|
||||||
use futures::channel::mpsc;
|
use futures::channel::mpsc;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use log::*;
|
use tracing::*;
|
||||||
use nym_sphinx::framing::codec::SphinxCodec;
|
use nym_sphinx::framing::codec::SphinxCodec;
|
||||||
use nym_sphinx::framing::packet::FramedSphinxPacket;
|
use nym_sphinx::framing::packet::FramedSphinxPacket;
|
||||||
use nym_sphinx::params::PacketMode;
|
use nym_sphinx::params::PacketMode;
|
||||||
use nym_sphinx::{addressing::nodes::NymNodeRoutingAddress, SphinxPacket};
|
use nym_sphinx::{addressing::nodes::NymNodeRoutingAddress, SphinxPacket};
|
||||||
|
use nym_sphinx::params::packet_sizes::PacketSize;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
@@ -197,6 +198,7 @@ impl Client {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl SendWithoutResponse for Client {
|
impl SendWithoutResponse for Client {
|
||||||
|
#[instrument(level="info", skip(self, packet), "Sending packet to mixnet", fields(packet_size))]
|
||||||
fn send_without_response(
|
fn send_without_response(
|
||||||
&mut self,
|
&mut self,
|
||||||
address: NymNodeRoutingAddress,
|
address: NymNodeRoutingAddress,
|
||||||
@@ -204,13 +206,15 @@ impl SendWithoutResponse for Client {
|
|||||||
packet_mode: PacketMode,
|
packet_mode: PacketMode,
|
||||||
) -> io::Result<()> {
|
) -> io::Result<()> {
|
||||||
trace!("Sending packet to {:?}", address);
|
trace!("Sending packet to {:?}", address);
|
||||||
|
let packet_size = PacketSize::get_type(packet.len()).unwrap();
|
||||||
|
Span::current().record("packet_size", field::debug(packet_size));
|
||||||
let framed_packet =
|
let framed_packet =
|
||||||
FramedSphinxPacket::new(packet, packet_mode, self.config.use_legacy_version);
|
FramedSphinxPacket::new(packet, packet_mode, self.config.use_legacy_version);
|
||||||
|
|
||||||
if let Some(sender) = self.conn_new.get_mut(&address) {
|
if let Some(sender) = self.conn_new.get_mut(&address) {
|
||||||
if let Err(err) = sender.channel.try_send(framed_packet) {
|
if let Err(err) = sender.channel.try_send(framed_packet) {
|
||||||
if err.is_full() {
|
if err.is_full() {
|
||||||
debug!("Connection to {} seems to not be able to handle all the traffic - dropping the current packet", address);
|
info!("Connection to {} seems to not be able to handle all the traffic - dropping the current packet", address);
|
||||||
// it's not a 'big' error, but we did not manage to send the packet
|
// it's not a 'big' error, but we did not manage to send the packet
|
||||||
// if the queue is full, we can't really do anything but to drop the packet
|
// if the queue is full, we can't really do anything but to drop the packet
|
||||||
Err(io::Error::new(
|
Err(io::Error::new(
|
||||||
|
|||||||
@@ -4,8 +4,8 @@
|
|||||||
use crate::client::{Client, Config, SendWithoutResponse};
|
use crate::client::{Client, Config, SendWithoutResponse};
|
||||||
use futures::channel::mpsc;
|
use futures::channel::mpsc;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use log::*;
|
|
||||||
use nym_sphinx::forwarding::packet::MixPacket;
|
use nym_sphinx::forwarding::packet::MixPacket;
|
||||||
|
use tracing::*;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
pub type MixForwardingSender = mpsc::UnboundedSender<MixPacket>;
|
pub type MixForwardingSender = mpsc::UnboundedSender<MixPacket>;
|
||||||
@@ -53,10 +53,10 @@ impl PacketForwarder {
|
|||||||
tokio::select! {
|
tokio::select! {
|
||||||
biased;
|
biased;
|
||||||
_ = self.shutdown.recv() => {
|
_ = self.shutdown.recv() => {
|
||||||
log::trace!("PacketForwarder: Received shutdown");
|
trace!("PacketForwarder: Received shutdown");
|
||||||
}
|
}
|
||||||
Some(mix_packet) = self.packet_receiver.next() => {
|
Some(mix_packet) = self.packet_receiver.next() => {
|
||||||
trace!("Going to forward packet to {:?}", mix_packet.next_hop());
|
trace!("Going to forward packet to {:?}", mix_packet.next_hop());
|
||||||
|
|
||||||
let next_hop = mix_packet.next_hop();
|
let next_hop = mix_packet.next_hop();
|
||||||
let packet_mode = mix_packet.packet_mode();
|
let packet_mode = mix_packet.packet_mode();
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ serde = { version = "1.0", features = ["derive"] }
|
|||||||
tokio = { version = "1.24.1", features = ["time", "macros", "rt", "net", "io-util"] }
|
tokio = { version = "1.24.1", features = ["time", "macros", "rt", "net", "io-util"] }
|
||||||
tokio-util = { version = "0.7.4", features = ["codec"] }
|
tokio-util = { version = "0.7.4", features = ["codec"] }
|
||||||
url = "2.2"
|
url = "2.2"
|
||||||
|
tracing = "0.1.37"
|
||||||
thiserror = "1.0.37"
|
thiserror = "1.0.37"
|
||||||
|
|
||||||
nym-crypto = { path = "../crypto" }
|
nym-crypto = { path = "../crypto" }
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
// SPDX-License-Identifier: Apache-2.0
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
use crate::packet_processor::error::MixProcessingError;
|
use crate::packet_processor::error::MixProcessingError;
|
||||||
use log::*;
|
use tracing::*;
|
||||||
use nym_sphinx_acknowledgements::surb_ack::SurbAck;
|
use nym_sphinx_acknowledgements::surb_ack::SurbAck;
|
||||||
use nym_sphinx_addressing::nodes::NymNodeRoutingAddress;
|
use nym_sphinx_addressing::nodes::NymNodeRoutingAddress;
|
||||||
use nym_sphinx_forwarding::packet::MixPacket;
|
use nym_sphinx_forwarding::packet::MixPacket;
|
||||||
@@ -57,6 +57,7 @@ impl SphinxPacketProcessor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Takes the received framed packet and tries to unwrap it from the sphinx encryption.
|
/// Takes the received framed packet and tries to unwrap it from the sphinx encryption.
|
||||||
|
#[instrument(level="debug", skip_all)]
|
||||||
fn perform_initial_unwrapping(
|
fn perform_initial_unwrapping(
|
||||||
&self,
|
&self,
|
||||||
received: FramedSphinxPacket,
|
received: FramedSphinxPacket,
|
||||||
@@ -73,6 +74,7 @@ impl SphinxPacketProcessor {
|
|||||||
|
|
||||||
/// Processed received forward hop packet - tries to extract next hop address, sets delay
|
/// Processed received forward hop packet - tries to extract next hop address, sets delay
|
||||||
/// and packs all the data in a way that can be easily sent to the next hop.
|
/// and packs all the data in a way that can be easily sent to the next hop.
|
||||||
|
#[instrument(level="debug", skip_all)]
|
||||||
fn process_forward_hop(
|
fn process_forward_hop(
|
||||||
&self,
|
&self,
|
||||||
packet: SphinxPacket,
|
packet: SphinxPacket,
|
||||||
@@ -88,6 +90,7 @@ impl SphinxPacketProcessor {
|
|||||||
|
|
||||||
/// Split data extracted from the final hop sphinx packet into a SURBAck and message
|
/// Split data extracted from the final hop sphinx packet into a SURBAck and message
|
||||||
/// that should get delivered to a client.
|
/// that should get delivered to a client.
|
||||||
|
#[instrument(level="debug", skip_all)]
|
||||||
fn split_hop_data_into_ack_and_message(
|
fn split_hop_data_into_ack_and_message(
|
||||||
&self,
|
&self,
|
||||||
mut extracted_data: Vec<u8>,
|
mut extracted_data: Vec<u8>,
|
||||||
@@ -105,6 +108,7 @@ impl SphinxPacketProcessor {
|
|||||||
|
|
||||||
/// Tries to extract a SURBAck that could be sent back into the mix network and message
|
/// Tries to extract a SURBAck that could be sent back into the mix network and message
|
||||||
/// that should get delivered to a client from received Sphinx packet.
|
/// that should get delivered to a client from received Sphinx packet.
|
||||||
|
#[instrument(level="debug", skip_all)]
|
||||||
fn split_into_ack_and_message(
|
fn split_into_ack_and_message(
|
||||||
&self,
|
&self,
|
||||||
data: Vec<u8>,
|
data: Vec<u8>,
|
||||||
@@ -116,10 +120,7 @@ impl SphinxPacketProcessor {
|
|||||||
trace!("received an ack packet!");
|
trace!("received an ack packet!");
|
||||||
Ok((None, data))
|
Ok((None, data))
|
||||||
}
|
}
|
||||||
PacketSize::RegularPacket
|
_ => {
|
||||||
| PacketSize::ExtendedPacket8
|
|
||||||
| PacketSize::ExtendedPacket16
|
|
||||||
| PacketSize::ExtendedPacket32 => {
|
|
||||||
trace!("received a normal packet!");
|
trace!("received a normal packet!");
|
||||||
let (ack_data, message) = self.split_hop_data_into_ack_and_message(data)?;
|
let (ack_data, message) = self.split_hop_data_into_ack_and_message(data)?;
|
||||||
let (ack_first_hop, ack_packet) = SurbAck::try_recover_first_hop_packet(&ack_data)?;
|
let (ack_first_hop, ack_packet) = SurbAck::try_recover_first_hop_packet(&ack_data)?;
|
||||||
@@ -132,6 +133,7 @@ impl SphinxPacketProcessor {
|
|||||||
/// Processed received final hop packet - tries to extract SURBAck out of it (assuming the
|
/// Processed received final hop packet - tries to extract SURBAck out of it (assuming the
|
||||||
/// packet itself is not an ACK) and splits it from the message that should get delivered
|
/// packet itself is not an ACK) and splits it from the message that should get delivered
|
||||||
/// to the destination.
|
/// to the destination.
|
||||||
|
#[instrument(level="debug", skip_all)]
|
||||||
fn process_final_hop(
|
fn process_final_hop(
|
||||||
&self,
|
&self,
|
||||||
destination: DestinationAddressBytes,
|
destination: DestinationAddressBytes,
|
||||||
@@ -153,6 +155,7 @@ impl SphinxPacketProcessor {
|
|||||||
|
|
||||||
/// Performs final processing for the unwrapped packet based on whether it was a forward hop
|
/// Performs final processing for the unwrapped packet based on whether it was a forward hop
|
||||||
/// or a final hop.
|
/// or a final hop.
|
||||||
|
#[instrument(level="debug", skip_all)]
|
||||||
fn perform_final_processing(
|
fn perform_final_processing(
|
||||||
&self,
|
&self,
|
||||||
packet: ProcessedPacket,
|
packet: ProcessedPacket,
|
||||||
@@ -170,7 +173,7 @@ impl SphinxPacketProcessor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#[instrument(level="debug", skip_all, fields(packet_size=?received.packet_size()))]
|
||||||
pub fn process_received(
|
pub fn process_received(
|
||||||
&self,
|
&self,
|
||||||
received: FramedSphinxPacket,
|
received: FramedSphinxPacket,
|
||||||
|
|||||||
@@ -20,6 +20,16 @@ const ACK_PACKET_SIZE: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + ACK_IV_SIZE
|
|||||||
const EXTENDED_PACKET_SIZE_8: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 8 * 1024;
|
const EXTENDED_PACKET_SIZE_8: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 8 * 1024;
|
||||||
const EXTENDED_PACKET_SIZE_16: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 16 * 1024;
|
const EXTENDED_PACKET_SIZE_16: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 16 * 1024;
|
||||||
const EXTENDED_PACKET_SIZE_32: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 32 * 1024;
|
const EXTENDED_PACKET_SIZE_32: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 32 * 1024;
|
||||||
|
const EXTENDED_PACKET_SIZE_10: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 10 * 1024;
|
||||||
|
const EXTENDED_PACKET_SIZE_15: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 15 * 1024;
|
||||||
|
const EXTENDED_PACKET_SIZE_20: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 20 * 1024;
|
||||||
|
const EXTENDED_PACKET_SIZE_25: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 25 * 1024;
|
||||||
|
const EXTENDED_PACKET_SIZE_50: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 50 * 1024;
|
||||||
|
const EXTENDED_PACKET_SIZE_100: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 100 * 1024;
|
||||||
|
const EXTENDED_PACKET_SIZE_150: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 150 * 1024;
|
||||||
|
const EXTENDED_PACKET_SIZE_200: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 200 * 1024;
|
||||||
|
const EXTENDED_PACKET_SIZE_250: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 250 * 1024;
|
||||||
|
const EXTENDED_PACKET_SIZE_500: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 500 * 1024;
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
pub enum InvalidPacketSize {
|
pub enum InvalidPacketSize {
|
||||||
@@ -51,6 +61,18 @@ pub enum PacketSize {
|
|||||||
|
|
||||||
// for example for streaming fast and furious in compressed XviD quality
|
// for example for streaming fast and furious in compressed XviD quality
|
||||||
ExtendedPacket16 = 5,
|
ExtendedPacket16 = 5,
|
||||||
|
|
||||||
|
ExtendedPacket10 = 6,
|
||||||
|
ExtendedPacket15 = 7,
|
||||||
|
ExtendedPacket20 = 8,
|
||||||
|
|
||||||
|
ExtendedPacket25 = 9,
|
||||||
|
ExtendedPacket50 = 10,
|
||||||
|
ExtendedPacket100 = 11,
|
||||||
|
ExtendedPacket150 = 12,
|
||||||
|
ExtendedPacket200 = 13,
|
||||||
|
ExtendedPacket250 = 14,
|
||||||
|
ExtendedPacket500 = 15,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FromStr for PacketSize {
|
impl FromStr for PacketSize {
|
||||||
@@ -63,6 +85,16 @@ impl FromStr for PacketSize {
|
|||||||
"extended8" => Ok(Self::ExtendedPacket8),
|
"extended8" => Ok(Self::ExtendedPacket8),
|
||||||
"extended16" => Ok(Self::ExtendedPacket16),
|
"extended16" => Ok(Self::ExtendedPacket16),
|
||||||
"extended32" => Ok(Self::ExtendedPacket32),
|
"extended32" => Ok(Self::ExtendedPacket32),
|
||||||
|
"extended10" => Ok(Self::ExtendedPacket10),
|
||||||
|
"extended15" => Ok(Self::ExtendedPacket15),
|
||||||
|
"extended20" => Ok(Self::ExtendedPacket20),
|
||||||
|
"extended25" => Ok(Self::ExtendedPacket25),
|
||||||
|
"extended50" => Ok(Self::ExtendedPacket50),
|
||||||
|
"extended100" => Ok(Self::ExtendedPacket100),
|
||||||
|
"extended150" => Ok(Self::ExtendedPacket150),
|
||||||
|
"extended200" => Ok(Self::ExtendedPacket200),
|
||||||
|
"extended250" => Ok(Self::ExtendedPacket250),
|
||||||
|
"extended500" => Ok(Self::ExtendedPacket500),
|
||||||
s => Err(InvalidPacketSize::UnknownExtendedPacketVariant {
|
s => Err(InvalidPacketSize::UnknownExtendedPacketVariant {
|
||||||
received: s.to_string(),
|
received: s.to_string(),
|
||||||
}),
|
}),
|
||||||
@@ -80,6 +112,16 @@ impl TryFrom<u8> for PacketSize {
|
|||||||
_ if value == (PacketSize::ExtendedPacket8 as u8) => Ok(Self::ExtendedPacket8),
|
_ if value == (PacketSize::ExtendedPacket8 as u8) => Ok(Self::ExtendedPacket8),
|
||||||
_ if value == (PacketSize::ExtendedPacket16 as u8) => Ok(Self::ExtendedPacket16),
|
_ if value == (PacketSize::ExtendedPacket16 as u8) => Ok(Self::ExtendedPacket16),
|
||||||
_ if value == (PacketSize::ExtendedPacket32 as u8) => Ok(Self::ExtendedPacket32),
|
_ if value == (PacketSize::ExtendedPacket32 as u8) => Ok(Self::ExtendedPacket32),
|
||||||
|
_ if value == (PacketSize::ExtendedPacket10 as u8) => Ok(Self::ExtendedPacket10),
|
||||||
|
_ if value == (PacketSize::ExtendedPacket15 as u8) => Ok(Self::ExtendedPacket15),
|
||||||
|
_ if value == (PacketSize::ExtendedPacket20 as u8) => Ok(Self::ExtendedPacket20),
|
||||||
|
_ if value == (PacketSize::ExtendedPacket25 as u8) => Ok(Self::ExtendedPacket25),
|
||||||
|
_ if value == (PacketSize::ExtendedPacket50 as u8) => Ok(Self::ExtendedPacket50),
|
||||||
|
_ if value == (PacketSize::ExtendedPacket100 as u8) => Ok(Self::ExtendedPacket100),
|
||||||
|
_ if value == (PacketSize::ExtendedPacket150 as u8) => Ok(Self::ExtendedPacket150),
|
||||||
|
_ if value == (PacketSize::ExtendedPacket200 as u8) => Ok(Self::ExtendedPacket200),
|
||||||
|
_ if value == (PacketSize::ExtendedPacket250 as u8) => Ok(Self::ExtendedPacket250),
|
||||||
|
_ if value == (PacketSize::ExtendedPacket500 as u8) => Ok(Self::ExtendedPacket500),
|
||||||
v => Err(InvalidPacketSize::UnknownPacketTag { received: v }),
|
v => Err(InvalidPacketSize::UnknownPacketTag { received: v }),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -93,6 +135,16 @@ impl PacketSize {
|
|||||||
PacketSize::ExtendedPacket8 => EXTENDED_PACKET_SIZE_8,
|
PacketSize::ExtendedPacket8 => EXTENDED_PACKET_SIZE_8,
|
||||||
PacketSize::ExtendedPacket16 => EXTENDED_PACKET_SIZE_16,
|
PacketSize::ExtendedPacket16 => EXTENDED_PACKET_SIZE_16,
|
||||||
PacketSize::ExtendedPacket32 => EXTENDED_PACKET_SIZE_32,
|
PacketSize::ExtendedPacket32 => EXTENDED_PACKET_SIZE_32,
|
||||||
|
PacketSize::ExtendedPacket10 => EXTENDED_PACKET_SIZE_10,
|
||||||
|
PacketSize::ExtendedPacket15 => EXTENDED_PACKET_SIZE_15,
|
||||||
|
PacketSize::ExtendedPacket20 => EXTENDED_PACKET_SIZE_20,
|
||||||
|
PacketSize::ExtendedPacket25 => EXTENDED_PACKET_SIZE_25,
|
||||||
|
PacketSize::ExtendedPacket50 => EXTENDED_PACKET_SIZE_50,
|
||||||
|
PacketSize::ExtendedPacket100 => EXTENDED_PACKET_SIZE_100,
|
||||||
|
PacketSize::ExtendedPacket150 => EXTENDED_PACKET_SIZE_150,
|
||||||
|
PacketSize::ExtendedPacket200 => EXTENDED_PACKET_SIZE_200,
|
||||||
|
PacketSize::ExtendedPacket250 => EXTENDED_PACKET_SIZE_250,
|
||||||
|
PacketSize::ExtendedPacket500 => EXTENDED_PACKET_SIZE_500,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -115,6 +167,26 @@ impl PacketSize {
|
|||||||
Ok(PacketSize::ExtendedPacket16)
|
Ok(PacketSize::ExtendedPacket16)
|
||||||
} else if PacketSize::ExtendedPacket32.size() == size {
|
} else if PacketSize::ExtendedPacket32.size() == size {
|
||||||
Ok(PacketSize::ExtendedPacket32)
|
Ok(PacketSize::ExtendedPacket32)
|
||||||
|
} else if PacketSize::ExtendedPacket10.size() == size {
|
||||||
|
Ok(PacketSize::ExtendedPacket10)
|
||||||
|
} else if PacketSize::ExtendedPacket15.size() == size {
|
||||||
|
Ok(PacketSize::ExtendedPacket15)
|
||||||
|
} else if PacketSize::ExtendedPacket20.size() == size {
|
||||||
|
Ok(PacketSize::ExtendedPacket20)
|
||||||
|
} else if PacketSize::ExtendedPacket25.size() == size {
|
||||||
|
Ok(PacketSize::ExtendedPacket25)
|
||||||
|
} else if PacketSize::ExtendedPacket50.size() == size {
|
||||||
|
Ok(PacketSize::ExtendedPacket50)
|
||||||
|
} else if PacketSize::ExtendedPacket100.size() == size {
|
||||||
|
Ok(PacketSize::ExtendedPacket100)
|
||||||
|
} else if PacketSize::ExtendedPacket150.size() == size {
|
||||||
|
Ok(PacketSize::ExtendedPacket150)
|
||||||
|
} else if PacketSize::ExtendedPacket200.size() == size {
|
||||||
|
Ok(PacketSize::ExtendedPacket200)
|
||||||
|
} else if PacketSize::ExtendedPacket250.size() == size {
|
||||||
|
Ok(PacketSize::ExtendedPacket250)
|
||||||
|
} else if PacketSize::ExtendedPacket500.size() == size {
|
||||||
|
Ok(PacketSize::ExtendedPacket500)
|
||||||
} else {
|
} else {
|
||||||
Err(InvalidPacketSize::UnknownPacketSize { received: size })
|
Err(InvalidPacketSize::UnknownPacketSize { received: size })
|
||||||
}
|
}
|
||||||
@@ -125,7 +197,17 @@ impl PacketSize {
|
|||||||
PacketSize::RegularPacket | PacketSize::AckPacket => false,
|
PacketSize::RegularPacket | PacketSize::AckPacket => false,
|
||||||
PacketSize::ExtendedPacket8
|
PacketSize::ExtendedPacket8
|
||||||
| PacketSize::ExtendedPacket16
|
| PacketSize::ExtendedPacket16
|
||||||
| PacketSize::ExtendedPacket32 => true,
|
| PacketSize::ExtendedPacket32
|
||||||
|
| PacketSize::ExtendedPacket10
|
||||||
|
| PacketSize::ExtendedPacket15
|
||||||
|
| PacketSize::ExtendedPacket20
|
||||||
|
| PacketSize::ExtendedPacket25
|
||||||
|
| PacketSize::ExtendedPacket50
|
||||||
|
| PacketSize::ExtendedPacket100
|
||||||
|
| PacketSize::ExtendedPacket150
|
||||||
|
| PacketSize::ExtendedPacket200
|
||||||
|
| PacketSize::ExtendedPacket250
|
||||||
|
| PacketSize::ExtendedPacket500 => true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -47,6 +47,16 @@ impl From<u8> for PacketVersion {
|
|||||||
n if n == PacketSize::ExtendedPacket8 as u8 => PacketVersion::Legacy,
|
n if n == PacketSize::ExtendedPacket8 as u8 => PacketVersion::Legacy,
|
||||||
n if n == PacketSize::ExtendedPacket16 as u8 => PacketVersion::Legacy,
|
n if n == PacketSize::ExtendedPacket16 as u8 => PacketVersion::Legacy,
|
||||||
n if n == PacketSize::ExtendedPacket32 as u8 => PacketVersion::Legacy,
|
n if n == PacketSize::ExtendedPacket32 as u8 => PacketVersion::Legacy,
|
||||||
|
n if n == PacketSize::ExtendedPacket10 as u8 => PacketVersion::Legacy,
|
||||||
|
n if n == PacketSize::ExtendedPacket15 as u8 => PacketVersion::Legacy,
|
||||||
|
n if n == PacketSize::ExtendedPacket20 as u8 => PacketVersion::Legacy,
|
||||||
|
n if n == PacketSize::ExtendedPacket25 as u8 => PacketVersion::Legacy,
|
||||||
|
n if n == PacketSize::ExtendedPacket50 as u8 => PacketVersion::Legacy,
|
||||||
|
n if n == PacketSize::ExtendedPacket100 as u8 => PacketVersion::Legacy,
|
||||||
|
n if n == PacketSize::ExtendedPacket150 as u8 => PacketVersion::Legacy,
|
||||||
|
n if n == PacketSize::ExtendedPacket200 as u8 => PacketVersion::Legacy,
|
||||||
|
n if n == PacketSize::ExtendedPacket250 as u8 => PacketVersion::Legacy,
|
||||||
|
n if n == PacketSize::ExtendedPacket500 as u8 => PacketVersion::Legacy,
|
||||||
n => PacketVersion::Versioned(n),
|
n => PacketVersion::Versioned(n),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -50,6 +50,12 @@ tokio-stream = { version = "0.1.11", features = ["fs"] }
|
|||||||
tokio-tungstenite = "0.14"
|
tokio-tungstenite = "0.14"
|
||||||
tokio-util = { version = "0.7.4", features = ["codec"] }
|
tokio-util = { version = "0.7.4", features = ["codec"] }
|
||||||
url = { version = "2.2", features = ["serde"] }
|
url = { version = "2.2", features = ["serde"] }
|
||||||
|
tracing = "0.1.37"
|
||||||
|
tracing-subscriber = {version = "0.3.16", features = ["registry", "std", "env-filter"]}
|
||||||
|
tracing-opentelemetry = "0.18.0"
|
||||||
|
opentelemetry-jaeger = {version = "0.17.0", features = ["collector_client","rt-tokio","isahc_collector_client"]}
|
||||||
|
opentelemetry = {version = "0.18.0", features = ["rt-tokio"]}
|
||||||
|
tracing-flame = "0.2.0"
|
||||||
|
|
||||||
# internal
|
# internal
|
||||||
nym-coconut-interface = { path = "../common/coconut-interface" }
|
nym-coconut-interface = { path = "../common/coconut-interface" }
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ rand = { version = "0.7.3", features = ["wasm-bindgen"] }
|
|||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
thiserror = "1.0"
|
thiserror = "1.0"
|
||||||
|
tracing = "0.1.29"
|
||||||
|
|
||||||
nym-crypto = { path = "../../common/crypto" }
|
nym-crypto = { path = "../../common/crypto" }
|
||||||
nym-pemstore = { path = "../../common/pemstore" }
|
nym-pemstore = { path = "../../common/pemstore" }
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ use crate::registration::handshake::shared_key::{SharedKeySize, SharedKeys};
|
|||||||
use crate::registration::handshake::WsItem;
|
use crate::registration::handshake::WsItem;
|
||||||
use crate::types;
|
use crate::types;
|
||||||
use futures::{Sink, SinkExt, Stream, StreamExt};
|
use futures::{Sink, SinkExt, Stream, StreamExt};
|
||||||
use log::*;
|
|
||||||
use nym_crypto::{
|
use nym_crypto::{
|
||||||
asymmetric::{encryption, identity},
|
asymmetric::{encryption, identity},
|
||||||
generic_array::typenum::Unsigned,
|
generic_array::typenum::Unsigned,
|
||||||
@@ -14,6 +13,7 @@ use nym_crypto::{
|
|||||||
symmetric::stream_cipher,
|
symmetric::stream_cipher,
|
||||||
};
|
};
|
||||||
use nym_sphinx::params::{GatewayEncryptionAlgorithm, GatewaySharedKeyHkdfAlgorithm};
|
use nym_sphinx::params::{GatewayEncryptionAlgorithm, GatewaySharedKeyHkdfAlgorithm};
|
||||||
|
use tracing::*;
|
||||||
use rand::{CryptoRng, RngCore};
|
use rand::{CryptoRng, RngCore};
|
||||||
use std::convert::{TryFrom, TryInto};
|
use std::convert::{TryFrom, TryInto};
|
||||||
use tungstenite::Message as WsMessage;
|
use tungstenite::Message as WsMessage;
|
||||||
|
|||||||
@@ -7,6 +7,8 @@ use crate::{
|
|||||||
OutputFormat,
|
OutputFormat,
|
||||||
};
|
};
|
||||||
use clap::Args;
|
use clap::Args;
|
||||||
|
use config::NymConfig;
|
||||||
|
use tracing::*;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::net::IpAddr;
|
use std::net::IpAddr;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
@@ -115,19 +117,19 @@ fn special_addresses() -> Vec<&'static str> {
|
|||||||
pub async fn execute(args: Run, output: OutputFormat) -> Result<(), Box<dyn Error + Send + Sync>> {
|
pub async fn execute(args: Run, output: OutputFormat) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||||
let id = args.id.clone();
|
let id = args.id.clone();
|
||||||
println!("Starting gateway {id}...");
|
println!("Starting gateway {id}...");
|
||||||
|
event!(Level::INFO, "Loading config");
|
||||||
let config = build_config(id, args)?;
|
let config = build_config(id, args)?;
|
||||||
ensure_config_version_compatibility(&config)?;
|
ensure_config_version_compatibility(&config)?;
|
||||||
|
|
||||||
if special_addresses().contains(&&*config.get_listening_address().to_string()) {
|
if special_addresses().contains(&&*config.get_listening_address().to_string()) {
|
||||||
show_binding_warning(config.get_listening_address().to_string());
|
show_binding_warning(config.get_listening_address().to_string());
|
||||||
}
|
}
|
||||||
|
event!(Level::INFO, "Creating Gateway node");
|
||||||
let mut gateway = crate::node::create_gateway(config).await;
|
let mut gateway = crate::node::create_gateway(config).await;
|
||||||
eprintln!(
|
eprintln!(
|
||||||
"\nTo bond your gateway you will need to install the Nym wallet, go to https://nymtech.net/get-involved and select the Download button.\n\
|
"\nTo bond your gateway you will need to install the Nym wallet, go to https://nymtech.net/get-involved and select the Download button.\n\
|
||||||
Select the correct version and install it to your machine. You will need to provide the following: \n ");
|
Select the correct version and install it to your machine. You will need to provide the following: \n ");
|
||||||
gateway.print_node_details(output)?;
|
gateway.print_node_details(output)?;
|
||||||
|
//.instrument(info_span!("Gateway run"))
|
||||||
gateway.run().await
|
gateway.run().await
|
||||||
}
|
}
|
||||||
|
|||||||
+40
-4
@@ -4,11 +4,18 @@
|
|||||||
use clap::{crate_name, crate_version, Parser, ValueEnum};
|
use clap::{crate_name, crate_version, Parser, ValueEnum};
|
||||||
use colored::Colorize;
|
use colored::Colorize;
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use log::error;
|
|
||||||
use nym_bin_common::logging::setup_logging;
|
use nym_bin_common::logging::setup_logging;
|
||||||
use nym_bin_common::{build_information::BinaryBuildInformation, logging::banner};
|
use nym_bin_common::{build_information::BinaryBuildInformation, logging::banner};
|
||||||
use nym_network_defaults::setup_env;
|
use nym_network_defaults::setup_env;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
|
use tracing_subscriber::layer::SubscriberExt;
|
||||||
|
use tracing_subscriber::{EnvFilter, Registry, filter};
|
||||||
|
use tracing::*;
|
||||||
|
use tracing_flame::FlameLayer;
|
||||||
|
use std::time::Duration;
|
||||||
|
use std::thread::sleep;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
mod commands;
|
mod commands;
|
||||||
mod config;
|
mod config;
|
||||||
@@ -64,14 +71,39 @@ impl Cli {
|
|||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
|
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||||
setup_logging();
|
let tracer = opentelemetry_jaeger::new_agent_pipeline()
|
||||||
|
.with_endpoint("143.42.21.138:6831")
|
||||||
|
.with_service_name("nym_gateway")
|
||||||
|
.with_auto_split_batch(true)
|
||||||
|
.install_batch(opentelemetry::runtime::Tokio)
|
||||||
|
.expect("Failed to initialize tracer");
|
||||||
|
|
||||||
|
//let jaeger_layer = tracing_opentelemetry::layer().with_tracer(tracer);
|
||||||
|
let hyper_filter = filter::filter_fn(|metadata| {!metadata.target().starts_with("hyper")});
|
||||||
|
let tokio_filter = filter::filter_fn(|metadata| {!metadata.target().starts_with("tokio")});
|
||||||
|
|
||||||
|
let (flame_layer, _guard) = FlameLayer::with_file("./tracing.folded").unwrap();
|
||||||
|
|
||||||
|
let subscriber = Registry::default()
|
||||||
|
.with(EnvFilter::from_default_env())
|
||||||
|
.with(hyper_filter)
|
||||||
|
.with(tokio_filter)
|
||||||
|
.with(tracing_subscriber::fmt::layer().pretty())
|
||||||
|
.with(flame_layer);
|
||||||
|
//.with(jaeger_layer)
|
||||||
|
|
||||||
|
|
||||||
|
// tracing::subscriber::set_global_default(subscriber)
|
||||||
|
// .expect("Failed to set global subscriber");
|
||||||
|
//tracing_subscriber::fmt::init();
|
||||||
|
//setup_logging();
|
||||||
if atty::is(atty::Stream::Stdout) {
|
if atty::is(atty::Stream::Stdout) {
|
||||||
println!("{}", banner(crate_name!(), crate_version!()));
|
println!("{}", banner(crate_name!(), crate_version!()));
|
||||||
}
|
}
|
||||||
|
|
||||||
let args = Cli::parse();
|
let args = Cli::parse();
|
||||||
setup_env(args.config_env_file.as_ref());
|
setup_env(args.config_env_file.as_ref());
|
||||||
|
//.instrument(info_span!("Execute Run"))
|
||||||
commands::execute(args).await.map_err(|err| {
|
commands::execute(args).await.map_err(|err| {
|
||||||
if atty::is(atty::Stream::Stdout) {
|
if atty::is(atty::Stream::Stdout) {
|
||||||
let error_message = format!("{err}").red();
|
let error_message = format!("{err}").red();
|
||||||
@@ -79,7 +111,11 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
|
|||||||
error!("Exiting...");
|
error!("Exiting...");
|
||||||
}
|
}
|
||||||
err
|
err
|
||||||
})
|
});
|
||||||
|
//sleep(Duration::from_secs(5));
|
||||||
|
//opentelemetry::global::shutdown_tracer_provider();
|
||||||
|
Ok(())
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@@ -9,8 +9,8 @@ use futures::StreamExt;
|
|||||||
use gateway_requests::iv::IVConversionError;
|
use gateway_requests::iv::IVConversionError;
|
||||||
use gateway_requests::types::{BinaryRequest, ServerResponse};
|
use gateway_requests::types::{BinaryRequest, ServerResponse};
|
||||||
use gateway_requests::{ClientControlRequest, GatewayRequestsError};
|
use gateway_requests::{ClientControlRequest, GatewayRequestsError};
|
||||||
use log::*;
|
|
||||||
use nym_sphinx::forwarding::packet::MixPacket;
|
use nym_sphinx::forwarding::packet::MixPacket;
|
||||||
|
use tracing::*;
|
||||||
use rand::{CryptoRng, Rng};
|
use rand::{CryptoRng, Rng};
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use std::process;
|
use std::process;
|
||||||
@@ -132,6 +132,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Explicitly removes handle from the global store.
|
/// Explicitly removes handle from the global store.
|
||||||
|
#[instrument(level="debug", skip_all)]
|
||||||
fn disconnect(self) {
|
fn disconnect(self) {
|
||||||
self.inner
|
self.inner
|
||||||
.active_clients_store
|
.active_clients_store
|
||||||
@@ -180,6 +181,7 @@ where
|
|||||||
/// # Arguments
|
/// # Arguments
|
||||||
///
|
///
|
||||||
/// * `mix_packet`: packet received from the client that should get forwarded into the network.
|
/// * `mix_packet`: packet received from the client that should get forwarded into the network.
|
||||||
|
#[instrument(level="debug", skip_all)]
|
||||||
fn forward_packet(&self, mix_packet: MixPacket) {
|
fn forward_packet(&self, mix_packet: MixPacket) {
|
||||||
if let Err(err) = self.inner.outbound_mix_sender.unbounded_send(mix_packet) {
|
if let Err(err) = self.inner.outbound_mix_sender.unbounded_send(mix_packet) {
|
||||||
error!("We failed to forward requested mix packet - {err}. Presumably our mix forwarder has crashed. We cannot continue.");
|
error!("We failed to forward requested mix packet - {err}. Presumably our mix forwarder has crashed. We cannot continue.");
|
||||||
@@ -279,13 +281,14 @@ where
|
|||||||
/// # Arguments
|
/// # Arguments
|
||||||
///
|
///
|
||||||
/// * `mix_packet`: packet received from the client that should get forwarded into the network.
|
/// * `mix_packet`: packet received from the client that should get forwarded into the network.
|
||||||
|
#[instrument(level="debug", skip_all)]
|
||||||
async fn handle_forward_sphinx(
|
async fn handle_forward_sphinx(
|
||||||
&self,
|
&self,
|
||||||
mix_packet: MixPacket,
|
mix_packet: MixPacket,
|
||||||
) -> Result<ServerResponse, RequestHandlingError> {
|
) -> Result<ServerResponse, RequestHandlingError> {
|
||||||
let consumed_bandwidth = mix_packet.sphinx_packet().len() as i64;
|
let consumed_bandwidth = mix_packet.sphinx_packet().len() as i64;
|
||||||
|
|
||||||
let available_bandwidth = self.get_available_bandwidth().await?;
|
let available_bandwidth = self.get_available_bandwidth().instrument(trace_span!("Get available bandwidth")).await?;
|
||||||
|
|
||||||
if available_bandwidth < consumed_bandwidth {
|
if available_bandwidth < consumed_bandwidth {
|
||||||
return Ok(ServerResponse::new_error(
|
return Ok(ServerResponse::new_error(
|
||||||
@@ -293,7 +296,7 @@ where
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
self.consume_bandwidth(consumed_bandwidth).await?;
|
self.consume_bandwidth(consumed_bandwidth).instrument(trace_span!("Consume bandwidth")).await?;
|
||||||
self.forward_packet(mix_packet);
|
self.forward_packet(mix_packet);
|
||||||
|
|
||||||
Ok(ServerResponse::Send {
|
Ok(ServerResponse::Send {
|
||||||
@@ -306,6 +309,7 @@ where
|
|||||||
/// # Arguments
|
/// # Arguments
|
||||||
///
|
///
|
||||||
/// * `bin_msg`: raw message to handle.
|
/// * `bin_msg`: raw message to handle.
|
||||||
|
#[instrument(level="debug", skip_all)]
|
||||||
async fn handle_binary(&self, bin_msg: Vec<u8>) -> Message {
|
async fn handle_binary(&self, bin_msg: Vec<u8>) -> Message {
|
||||||
// this function decrypts the request and checks the MAC
|
// this function decrypts the request and checks the MAC
|
||||||
match BinaryRequest::try_from_encrypted_tagged_bytes(bin_msg, &self.client.shared_keys) {
|
match BinaryRequest::try_from_encrypted_tagged_bytes(bin_msg, &self.client.shared_keys) {
|
||||||
@@ -327,6 +331,7 @@ where
|
|||||||
/// # Arguments
|
/// # Arguments
|
||||||
///
|
///
|
||||||
/// * `raw_request`: raw message to handle.
|
/// * `raw_request`: raw message to handle.
|
||||||
|
#[instrument(level="debug", skip_all)]
|
||||||
async fn handle_text(&mut self, raw_request: String) -> Message {
|
async fn handle_text(&mut self, raw_request: String) -> Message {
|
||||||
match ClientControlRequest::try_from(raw_request) {
|
match ClientControlRequest::try_from(raw_request) {
|
||||||
Err(e) => RequestHandlingError::InvalidTextRequest(e).into_error_message(),
|
Err(e) => RequestHandlingError::InvalidTextRequest(e).into_error_message(),
|
||||||
@@ -349,6 +354,7 @@ where
|
|||||||
/// # Arguments
|
/// # Arguments
|
||||||
///
|
///
|
||||||
/// * `raw_request`: raw received websocket message.
|
/// * `raw_request`: raw received websocket message.
|
||||||
|
//#[instrument(level="info", skip_all)]
|
||||||
async fn handle_request(&mut self, raw_request: Message) -> Option<Message> {
|
async fn handle_request(&mut self, raw_request: Message) -> Option<Message> {
|
||||||
// apparently tungstenite auto-handles ping/pong/close messages so for now let's ignore
|
// apparently tungstenite auto-handles ping/pong/close messages so for now let's ignore
|
||||||
// them and let's test that claim. If that's not the case, just copy code from
|
// them and let's test that claim. If that's not the case, just copy code from
|
||||||
@@ -363,6 +369,7 @@ where
|
|||||||
/// Simultaneously listens for incoming client requests, which realistically should only be
|
/// Simultaneously listens for incoming client requests, which realistically should only be
|
||||||
/// binary requests to forward sphinx packets or increase bandwidth
|
/// binary requests to forward sphinx packets or increase bandwidth
|
||||||
/// and for sphinx packets received from the mix network that should be sent back to the client.
|
/// and for sphinx packets received from the mix network that should be sent back to the client.
|
||||||
|
#[instrument(level="info", skip_all, name="Serving requests")]
|
||||||
pub(crate) async fn listen_for_requests(mut self, mut shutdown: TaskClient)
|
pub(crate) async fn listen_for_requests(mut self, mut shutdown: TaskClient)
|
||||||
where
|
where
|
||||||
S: AsyncRead + AsyncWrite + Unpin,
|
S: AsyncRead + AsyncWrite + Unpin,
|
||||||
@@ -373,14 +380,16 @@ where
|
|||||||
while !shutdown.is_shutdown() {
|
while !shutdown.is_shutdown() {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = shutdown.recv() => {
|
_ = shutdown.recv() => {
|
||||||
log::trace!("client_handling::AuthenticatedHandler: received shutdown");
|
trace!("client_handling::AuthenticatedHandler: received shutdown");
|
||||||
}
|
}
|
||||||
socket_msg = self.inner.read_websocket_message() => {
|
socket_msg = self.inner.read_websocket_message() => {
|
||||||
|
|
||||||
|
debug!("Handling client request");
|
||||||
let socket_msg = match socket_msg {
|
let socket_msg = match socket_msg {
|
||||||
None => break,
|
None => break,
|
||||||
Some(Ok(socket_msg)) => socket_msg,
|
Some(Ok(socket_msg)) => socket_msg,
|
||||||
Some(Err(err)) => {
|
Some(Err(err)) => {
|
||||||
error!("failed to obtain message from websocket stream! stopping connection handler: {err}");
|
log::error!("failed to obtain message from websocket stream! stopping connection handler: {err}");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -390,6 +399,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
if let Some(response) = self.handle_request(socket_msg).await {
|
if let Some(response) = self.handle_request(socket_msg).await {
|
||||||
|
trace!("Sending response to client request");
|
||||||
if let Err(err) = self.inner.send_websocket_message(response).await {
|
if let Err(err) = self.inner.send_websocket_message(response).await {
|
||||||
warn!(
|
warn!(
|
||||||
"Failed to send message over websocket: {err}. Assuming the connection is dead.",
|
"Failed to send message over websocket: {err}. Assuming the connection is dead.",
|
||||||
@@ -397,13 +407,17 @@ where
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
},
|
},
|
||||||
mix_messages = self.mix_receiver.next() => {
|
mix_messages = self.mix_receiver.next() => {
|
||||||
|
//let span = info_span!("Processing mixnet message");
|
||||||
|
//let guard = span.enter();
|
||||||
let mix_messages = mix_messages.expect("sender was unexpectedly closed! this shouldn't have ever happened!");
|
let mix_messages = mix_messages.expect("sender was unexpectedly closed! this shouldn't have ever happened!");
|
||||||
if let Err(err) = self.inner.push_packets_to_client(self.client.shared_keys, mix_messages).await {
|
if let Err(err) = self.inner.push_packets_to_client(self.client.shared_keys, mix_messages).await {
|
||||||
warn!("failed to send the unwrapped sphinx packets back to the client - {err}, assuming the connection is dead");
|
warn!("failed to send the unwrapped sphinx packets back to the client - {err}, assuming the connection is dead");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
//drop(guard);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,8 +1,9 @@
|
|||||||
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||||
// SPDX-License-Identifier: Apache-2.0
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
use log::*;
|
|
||||||
use nym_coconut_interface::Credential;
|
use nym_coconut_interface::Credential;
|
||||||
|
use coconut_interface::Credential;
|
||||||
|
use tracing::*;
|
||||||
use std::time::{Duration, SystemTime};
|
use std::time::{Duration, SystemTime};
|
||||||
use validator_client::nyxd::traits::DkgQueryClient;
|
use validator_client::nyxd::traits::DkgQueryClient;
|
||||||
use validator_client::{
|
use validator_client::{
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ use gateway_requests::registration::handshake::error::HandshakeError;
|
|||||||
use gateway_requests::registration::handshake::{gateway_handshake, SharedKeys};
|
use gateway_requests::registration::handshake::{gateway_handshake, SharedKeys};
|
||||||
use gateway_requests::types::{ClientControlRequest, ServerResponse};
|
use gateway_requests::types::{ClientControlRequest, ServerResponse};
|
||||||
use gateway_requests::{BinaryResponse, PROTOCOL_VERSION};
|
use gateway_requests::{BinaryResponse, PROTOCOL_VERSION};
|
||||||
use log::*;
|
use tracing::*;
|
||||||
use mixnet_client::forwarder::MixForwardingSender;
|
use mixnet_client::forwarder::MixForwardingSender;
|
||||||
use nym_crypto::asymmetric::identity;
|
use nym_crypto::asymmetric::identity;
|
||||||
use nym_sphinx::DestinationAddressBytes;
|
use nym_sphinx::DestinationAddressBytes;
|
||||||
@@ -111,6 +111,7 @@ where
|
|||||||
|
|
||||||
/// Attempts to perform websocket handshake with the remote and upgrades the raw TCP socket
|
/// Attempts to perform websocket handshake with the remote and upgrades the raw TCP socket
|
||||||
/// to the framed WebSocket.
|
/// to the framed WebSocket.
|
||||||
|
#[instrument(level="debug", skip_all)]
|
||||||
pub(crate) async fn perform_websocket_handshake(&mut self) -> Result<(), WsError>
|
pub(crate) async fn perform_websocket_handshake(&mut self) -> Result<(), WsError>
|
||||||
where
|
where
|
||||||
S: AsyncRead + AsyncWrite + Unpin,
|
S: AsyncRead + AsyncWrite + Unpin,
|
||||||
@@ -134,6 +135,7 @@ where
|
|||||||
/// # Arguments
|
/// # Arguments
|
||||||
///
|
///
|
||||||
/// * `init_msg`: a client handshake init message which should contain its identity public key as well as an ephemeral key.
|
/// * `init_msg`: a client handshake init message which should contain its identity public key as well as an ephemeral key.
|
||||||
|
#[instrument(level="debug", skip_all)]
|
||||||
async fn perform_registration_handshake(
|
async fn perform_registration_handshake(
|
||||||
&mut self,
|
&mut self,
|
||||||
init_msg: Vec<u8>,
|
init_msg: Vec<u8>,
|
||||||
@@ -157,6 +159,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Attempts to read websocket message from the associated socket.
|
/// Attempts to read websocket message from the associated socket.
|
||||||
|
#[instrument(level="debug", skip_all)]
|
||||||
pub(crate) async fn read_websocket_message(&mut self) -> Option<Result<Message, WsError>>
|
pub(crate) async fn read_websocket_message(&mut self) -> Option<Result<Message, WsError>>
|
||||||
where
|
where
|
||||||
S: AsyncRead + AsyncWrite + Unpin,
|
S: AsyncRead + AsyncWrite + Unpin,
|
||||||
@@ -172,6 +175,7 @@ where
|
|||||||
/// # Arguments
|
/// # Arguments
|
||||||
///
|
///
|
||||||
/// * `msg`: WebSocket message to write back to the client.
|
/// * `msg`: WebSocket message to write back to the client.
|
||||||
|
#[instrument(level="debug", skip_all)]
|
||||||
pub(crate) async fn send_websocket_message(&mut self, msg: Message) -> Result<(), WsError>
|
pub(crate) async fn send_websocket_message(&mut self, msg: Message) -> Result<(), WsError>
|
||||||
where
|
where
|
||||||
S: AsyncRead + AsyncWrite + Unpin,
|
S: AsyncRead + AsyncWrite + Unpin,
|
||||||
@@ -192,6 +196,7 @@ where
|
|||||||
///
|
///
|
||||||
/// * `shared_keys`: keys derived between the client and gateway.
|
/// * `shared_keys`: keys derived between the client and gateway.
|
||||||
/// * `packets`: unwrapped packets that are to be pushed back to the client.
|
/// * `packets`: unwrapped packets that are to be pushed back to the client.
|
||||||
|
#[instrument(level="debug", skip_all, fields(packets = packets.len()))]
|
||||||
pub(crate) async fn push_packets_to_client(
|
pub(crate) async fn push_packets_to_client(
|
||||||
&mut self,
|
&mut self,
|
||||||
shared_keys: SharedKeys,
|
shared_keys: SharedKeys,
|
||||||
@@ -202,6 +207,7 @@ where
|
|||||||
{
|
{
|
||||||
// note: into_ws_message encrypts the requests and adds a MAC on it. Perhaps it should
|
// note: into_ws_message encrypts the requests and adds a MAC on it. Perhaps it should
|
||||||
// be more explicit in the naming?
|
// be more explicit in the naming?
|
||||||
|
debug!("Pushing {} packets", packets.len());
|
||||||
let messages: Vec<Result<Message, WsError>> = packets
|
let messages: Vec<Result<Message, WsError>> = packets
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|received_message| {
|
.map(|received_message| {
|
||||||
@@ -561,6 +567,7 @@ where
|
|||||||
/// result in client getting registered or authenticated. All other requests, such as forwarding
|
/// result in client getting registered or authenticated. All other requests, such as forwarding
|
||||||
/// sphinx packets considered an error and terminate the connection.
|
/// sphinx packets considered an error and terminate the connection.
|
||||||
// TODO: somehow cleanup this method
|
// TODO: somehow cleanup this method
|
||||||
|
#[instrument(level="debug", skip_all)]
|
||||||
pub(crate) async fn perform_initial_authentication(
|
pub(crate) async fn perform_initial_authentication(
|
||||||
mut self,
|
mut self,
|
||||||
) -> Option<AuthenticatedHandler<R, S, St>>
|
) -> Option<AuthenticatedHandler<R, S, St>>
|
||||||
@@ -597,6 +604,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(auth_result) => {
|
Ok(auth_result) => {
|
||||||
|
debug!("Confirming auth");
|
||||||
if let Err(err) = self
|
if let Err(err) = self
|
||||||
.send_websocket_message(auth_result.server_response.into())
|
.send_websocket_message(auth_result.server_response.into())
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -4,9 +4,9 @@
|
|||||||
use crate::node::storage::Storage;
|
use crate::node::storage::Storage;
|
||||||
use gateway_requests::registration::handshake::SharedKeys;
|
use gateway_requests::registration::handshake::SharedKeys;
|
||||||
use gateway_requests::ServerResponse;
|
use gateway_requests::ServerResponse;
|
||||||
use log::{trace, warn};
|
|
||||||
use nym_sphinx::DestinationAddressBytes;
|
use nym_sphinx::DestinationAddressBytes;
|
||||||
use nym_task::TaskClient;
|
use nym_task::TaskClient;
|
||||||
|
use tracing::*;
|
||||||
use rand::{CryptoRng, Rng};
|
use rand::{CryptoRng, Rng};
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
use tokio_tungstenite::WebSocketStream;
|
use tokio_tungstenite::WebSocketStream;
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ use crate::node::client_handling::active_clients::ActiveClientsStore;
|
|||||||
use crate::node::client_handling::websocket::connection_handler::coconut::CoconutVerifier;
|
use crate::node::client_handling::websocket::connection_handler::coconut::CoconutVerifier;
|
||||||
use crate::node::client_handling::websocket::connection_handler::FreshHandler;
|
use crate::node::client_handling::websocket::connection_handler::FreshHandler;
|
||||||
use crate::node::storage::Storage;
|
use crate::node::storage::Storage;
|
||||||
use log::*;
|
use tracing::*;
|
||||||
use mixnet_client::forwarder::MixForwardingSender;
|
use mixnet_client::forwarder::MixForwardingSender;
|
||||||
use nym_crypto::asymmetric::identity;
|
use nym_crypto::asymmetric::identity;
|
||||||
use rand::rngs::OsRng;
|
use rand::rngs::OsRng;
|
||||||
@@ -60,7 +60,7 @@ impl Listener {
|
|||||||
tokio::select! {
|
tokio::select! {
|
||||||
biased;
|
biased;
|
||||||
_ = shutdown.recv() => {
|
_ = shutdown.recv() => {
|
||||||
log::trace!("client_handling::Listener: received shutdown");
|
trace!("client_handling::Listener: received shutdown");
|
||||||
}
|
}
|
||||||
connection = tcp_listener.accept() => {
|
connection = tcp_listener.accept() => {
|
||||||
match connection {
|
match connection {
|
||||||
@@ -79,7 +79,7 @@ impl Listener {
|
|||||||
Arc::clone(&self.coconut_verifier),
|
Arc::clone(&self.coconut_verifier),
|
||||||
);
|
);
|
||||||
let shutdown = shutdown.clone();
|
let shutdown = shutdown.clone();
|
||||||
tokio::spawn(async move { handle.start_handling(shutdown).await });
|
tokio::spawn(async move { handle.start_handling(shutdown).await }.instrument(info_span!("Connection handling", address = %remote_addr)));
|
||||||
}
|
}
|
||||||
Err(err) => warn!("failed to get client: {err}"),
|
Err(err) => warn!("failed to get client: {err}"),
|
||||||
}
|
}
|
||||||
@@ -102,6 +102,6 @@ impl Listener {
|
|||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
self.run(outbound_mix_sender, storage, active_clients_store, shutdown)
|
self.run(outbound_mix_sender, storage, active_clients_store, shutdown)
|
||||||
.await
|
.await
|
||||||
})
|
}.instrument(info_span!("Client Listener")))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ use crate::node::mixnet_handling::receiver::packet_processing::PacketProcessor;
|
|||||||
use crate::node::storage::error::StorageError;
|
use crate::node::storage::error::StorageError;
|
||||||
use crate::node::storage::Storage;
|
use crate::node::storage::Storage;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use log::*;
|
use tracing::*;
|
||||||
use mixnet_client::forwarder::MixForwardingSender;
|
use mixnet_client::forwarder::MixForwardingSender;
|
||||||
use mixnode_common::packet_processor::processor::ProcessedFinalHop;
|
use mixnode_common::packet_processor::processor::ProcessedFinalHop;
|
||||||
use nym_sphinx::forwarding::packet::MixPacket;
|
use nym_sphinx::forwarding::packet::MixPacket;
|
||||||
@@ -86,7 +86,7 @@ impl<St: Storage> ConnectionHandler<St> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#[instrument(level="debug", skip_all)]
|
||||||
fn try_push_message_to_client(
|
fn try_push_message_to_client(
|
||||||
&mut self,
|
&mut self,
|
||||||
client_address: DestinationAddressBytes,
|
client_address: DestinationAddressBytes,
|
||||||
@@ -105,7 +105,7 @@ impl<St: Storage> ConnectionHandler<St> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#[instrument(level="debug", skip_all)]
|
||||||
pub(crate) async fn store_processed_packet_payload(
|
pub(crate) async fn store_processed_packet_payload(
|
||||||
&self,
|
&self,
|
||||||
client_address: DestinationAddressBytes,
|
client_address: DestinationAddressBytes,
|
||||||
@@ -118,7 +118,7 @@ impl<St: Storage> ConnectionHandler<St> {
|
|||||||
|
|
||||||
self.storage.store_message(client_address, message).await
|
self.storage.store_message(client_address, message).await
|
||||||
}
|
}
|
||||||
|
#[instrument(level="debug", skip_all)]
|
||||||
fn forward_ack(&self, forward_ack: Option<MixPacket>, client_address: DestinationAddressBytes) {
|
fn forward_ack(&self, forward_ack: Option<MixPacket>, client_address: DestinationAddressBytes) {
|
||||||
if let Some(forward_ack) = forward_ack {
|
if let Some(forward_ack) = forward_ack {
|
||||||
trace!(
|
trace!(
|
||||||
@@ -130,7 +130,7 @@ impl<St: Storage> ConnectionHandler<St> {
|
|||||||
self.ack_sender.unbounded_send(forward_ack).unwrap();
|
self.ack_sender.unbounded_send(forward_ack).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#[instrument(level="debug", skip_all)]
|
||||||
async fn handle_processed_packet(&mut self, processed_final_hop: ProcessedFinalHop) {
|
async fn handle_processed_packet(&mut self, processed_final_hop: ProcessedFinalHop) {
|
||||||
let client_address = processed_final_hop.destination;
|
let client_address = processed_final_hop.destination;
|
||||||
let message = processed_final_hop.message;
|
let message = processed_final_hop.message;
|
||||||
@@ -154,7 +154,7 @@ impl<St: Storage> ConnectionHandler<St> {
|
|||||||
// received ack back into the network
|
// received ack back into the network
|
||||||
self.forward_ack(forward_ack, client_address);
|
self.forward_ack(forward_ack, client_address);
|
||||||
}
|
}
|
||||||
|
//#[instrument(level="info", skip_all)]
|
||||||
async fn handle_received_packet(&mut self, framed_sphinx_packet: FramedSphinxPacket) {
|
async fn handle_received_packet(&mut self, framed_sphinx_packet: FramedSphinxPacket) {
|
||||||
//
|
//
|
||||||
// TODO: here be replay attack detection - it will require similar key cache to the one in
|
// TODO: here be replay attack detection - it will require similar key cache to the one in
|
||||||
@@ -187,7 +187,7 @@ impl<St: Storage> ConnectionHandler<St> {
|
|||||||
tokio::select! {
|
tokio::select! {
|
||||||
biased;
|
biased;
|
||||||
_ = shutdown.recv() => {
|
_ = shutdown.recv() => {
|
||||||
log::trace!("ConnectionHandler: received shutdown");
|
trace!("ConnectionHandler: received shutdown");
|
||||||
}
|
}
|
||||||
Some(framed_sphinx_packet) = framed_conn.next() => {
|
Some(framed_sphinx_packet) = framed_conn.next() => {
|
||||||
match framed_sphinx_packet {
|
match framed_sphinx_packet {
|
||||||
|
|||||||
@@ -3,8 +3,8 @@
|
|||||||
|
|
||||||
use crate::node::mixnet_handling::receiver::connection_handler::ConnectionHandler;
|
use crate::node::mixnet_handling::receiver::connection_handler::ConnectionHandler;
|
||||||
use crate::node::storage::Storage;
|
use crate::node::storage::Storage;
|
||||||
use log::*;
|
|
||||||
use nym_task::TaskClient;
|
use nym_task::TaskClient;
|
||||||
|
use tracing::*;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::process;
|
use std::process;
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
@@ -37,13 +37,14 @@ impl Listener {
|
|||||||
tokio::select! {
|
tokio::select! {
|
||||||
biased;
|
biased;
|
||||||
_ = self.shutdown.recv() => {
|
_ = self.shutdown.recv() => {
|
||||||
log::trace!("mixnet_handling::Listener: Received shutdown");
|
trace!("mixnet_handling::Listener: Received shutdown");
|
||||||
}
|
}
|
||||||
connection = tcp_listener.accept() => {
|
connection = tcp_listener.accept() => {
|
||||||
match connection {
|
match connection {
|
||||||
Ok((socket, remote_addr)) => {
|
Ok((socket, remote_addr)) => {
|
||||||
let handler = connection_handler.clone();
|
let handler = connection_handler.clone();
|
||||||
tokio::spawn(handler.handle_connection(socket, remote_addr, self.shutdown.clone()));
|
tokio::spawn(handler.handle_connection(socket, remote_addr, self.shutdown.clone())
|
||||||
|
.instrument(info_span!("Mixnet connection handling", address = %remote_addr)));
|
||||||
}
|
}
|
||||||
Err(err) => warn!("failed to get client: {err}"),
|
Err(err) => warn!("failed to get client: {err}"),
|
||||||
}
|
}
|
||||||
@@ -58,6 +59,6 @@ impl Listener {
|
|||||||
{
|
{
|
||||||
info!("Running mix listener on {:?}", self.address.to_string());
|
info!("Running mix listener on {:?}", self.address.to_string());
|
||||||
|
|
||||||
tokio::spawn(async move { self.run(connection_handler).await })
|
tokio::spawn(async move { self.run(connection_handler).await }.instrument(info_span!("Mixnet Listener")))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ use crate::node::statistics::collector::GatewayStatisticsCollector;
|
|||||||
use crate::node::storage::Storage;
|
use crate::node::storage::Storage;
|
||||||
use crate::{commands::sign::load_identity_keys, OutputFormat};
|
use crate::{commands::sign::load_identity_keys, OutputFormat};
|
||||||
use colored::Colorize;
|
use colored::Colorize;
|
||||||
use log::*;
|
use tracing::*;
|
||||||
use mixnet_client::forwarder::{MixForwardingSender, PacketForwarder};
|
use mixnet_client::forwarder::{MixForwardingSender, PacketForwarder};
|
||||||
use nym_crypto::asymmetric::{encryption, identity};
|
use nym_crypto::asymmetric::{encryption, identity};
|
||||||
use nym_network_defaults::NymNetworkDetails;
|
use nym_network_defaults::NymNetworkDetails;
|
||||||
@@ -220,8 +220,7 @@ where
|
|||||||
self.config.get_use_legacy_sphinx_framing(),
|
self.config.get_use_legacy_sphinx_framing(),
|
||||||
shutdown,
|
shutdown,
|
||||||
);
|
);
|
||||||
|
tokio::spawn(async move { packet_forwarder.run().await }.instrument(info_span!("Packet Forwarder")));
|
||||||
tokio::spawn(async move { packet_forwarder.run().await });
|
|
||||||
packet_sender
|
packet_sender
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -230,7 +229,7 @@ where
|
|||||||
shutdown: TaskManager,
|
shutdown: TaskManager,
|
||||||
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||||
let res = shutdown.catch_interrupt().await;
|
let res = shutdown.catch_interrupt().await;
|
||||||
log::info!("Stopping nym gateway");
|
info!("Stopping nym gateway");
|
||||||
res
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -36,6 +36,12 @@ tokio = { version="1.21.2", features = ["rt-multi-thread", "net", "signal"] }
|
|||||||
tokio-util = { version="0.7.3", features = ["codec"] }
|
tokio-util = { version="0.7.3", features = ["codec"] }
|
||||||
toml = "0.5.8"
|
toml = "0.5.8"
|
||||||
url = { version = "2.2", features = ["serde"] }
|
url = { version = "2.2", features = ["serde"] }
|
||||||
|
tracing = "0.1.37"
|
||||||
|
tracing-subscriber = {version = "0.3.16", features = ["registry", "std", "env-filter"]}
|
||||||
|
tracing-opentelemetry = "0.18.0"
|
||||||
|
opentelemetry-jaeger = {version = "0.17.0", features = ["collector_client","rt-tokio","isahc_collector_client"]}
|
||||||
|
opentelemetry = {version = "0.18.0", features = ["rt-tokio"]}
|
||||||
|
tracing-flame = "0.2.0"
|
||||||
atty = "0.2"
|
atty = "0.2"
|
||||||
|
|
||||||
## internal
|
## internal
|
||||||
|
|||||||
+30
-1
@@ -7,6 +7,9 @@ extern crate rocket;
|
|||||||
use ::nym_config::defaults::setup_env;
|
use ::nym_config::defaults::setup_env;
|
||||||
use clap::{crate_name, crate_version, Parser, ValueEnum};
|
use clap::{crate_name, crate_version, Parser, ValueEnum};
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
|
use tracing_subscriber::layer::SubscriberExt;
|
||||||
|
use tracing_subscriber::{EnvFilter, Registry, filter};
|
||||||
|
use tracing_flame::FlameLayer;
|
||||||
use nym_bin_common::logging::setup_logging;
|
use nym_bin_common::logging::setup_logging;
|
||||||
use nym_bin_common::{build_information::BinaryBuildInformation, logging::banner};
|
use nym_bin_common::{build_information::BinaryBuildInformation, logging::banner};
|
||||||
|
|
||||||
@@ -62,7 +65,32 @@ impl Cli {
|
|||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
setup_logging();
|
//setup_logging();
|
||||||
|
|
||||||
|
//let tracer = opentelemetry_jaeger::new_agent_pipeline()
|
||||||
|
//.with_endpoint("143.42.21.138:6831")
|
||||||
|
//.with_service_name("nym_mixnode1")
|
||||||
|
//.with_auto_split_batch(true)
|
||||||
|
//.install_batch(opentelemetry::runtime::Tokio)
|
||||||
|
//.expect("Failed to initialize tracer");
|
||||||
|
|
||||||
|
//let jaeger_layer = tracing_opentelemetry::layer().with_tracer(tracer);
|
||||||
|
let hyper_filter = filter::filter_fn(|metadata| {!metadata.target().starts_with("hyper")});
|
||||||
|
let tokio_filter = filter::filter_fn(|metadata| {!metadata.target().starts_with("tokio")});
|
||||||
|
//let (flame_layer, _guard) = FlameLayer::with_file("./tracing.folded").unwrap();
|
||||||
|
|
||||||
|
let subscriber = Registry::default()
|
||||||
|
.with(EnvFilter::from_default_env())
|
||||||
|
.with(hyper_filter)
|
||||||
|
.with(tokio_filter)
|
||||||
|
.with(tracing_subscriber::fmt::layer().pretty());
|
||||||
|
//.with(flame_layer);
|
||||||
|
//.with(jaeger_layer);
|
||||||
|
|
||||||
|
|
||||||
|
tracing::subscriber::set_global_default(subscriber)
|
||||||
|
.expect("Failed to set global subscriber");
|
||||||
|
|
||||||
if atty::is(atty::Stream::Stdout) {
|
if atty::is(atty::Stream::Stdout) {
|
||||||
println!("{}", banner(crate_name!(), crate_version!()));
|
println!("{}", banner(crate_name!(), crate_version!()));
|
||||||
}
|
}
|
||||||
@@ -70,6 +98,7 @@ async fn main() {
|
|||||||
let args = Cli::parse();
|
let args = Cli::parse();
|
||||||
setup_env(args.config_env_file.as_ref());
|
setup_env(args.config_env_file.as_ref());
|
||||||
commands::execute(args).await;
|
commands::execute(args).await;
|
||||||
|
opentelemetry::global::shutdown_tracer_provider();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@@ -7,8 +7,10 @@ use crate::node::listener::connection_handler::packet_processing::{
|
|||||||
use crate::node::packet_delayforwarder::PacketDelayForwardSender;
|
use crate::node::packet_delayforwarder::PacketDelayForwardSender;
|
||||||
use crate::node::TaskClient;
|
use crate::node::TaskClient;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use log::{error, info};
|
use tracing::{error, info, trace, debug, warn};
|
||||||
|
use tracing::*;
|
||||||
use nym_sphinx::forwarding::packet::MixPacket;
|
use nym_sphinx::forwarding::packet::MixPacket;
|
||||||
|
use nym_sphinx::params::PacketSize;
|
||||||
use nym_sphinx::framing::codec::SphinxCodec;
|
use nym_sphinx::framing::codec::SphinxCodec;
|
||||||
use nym_sphinx::framing::packet::FramedSphinxPacket;
|
use nym_sphinx::framing::packet::FramedSphinxPacket;
|
||||||
use nym_sphinx::Delay as SphinxDelay;
|
use nym_sphinx::Delay as SphinxDelay;
|
||||||
@@ -16,6 +18,7 @@ use std::net::SocketAddr;
|
|||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
use tokio::time::Instant;
|
use tokio::time::Instant;
|
||||||
use tokio_util::codec::Framed;
|
use tokio_util::codec::Framed;
|
||||||
|
use std::time::{SystemTime,UNIX_EPOCH};
|
||||||
|
|
||||||
pub(crate) mod packet_processing;
|
pub(crate) mod packet_processing;
|
||||||
|
|
||||||
@@ -35,10 +38,12 @@ impl ConnectionHandler {
|
|||||||
delay_forwarding_channel,
|
delay_forwarding_channel,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#[instrument(level="debug", skip_all, "Sending to packet forwarder", fields(packet_size))]
|
||||||
fn delay_and_forward_packet(&self, mix_packet: MixPacket, delay: Option<SphinxDelay>) {
|
fn delay_and_forward_packet(&self, mix_packet: MixPacket, delay: Option<SphinxDelay>) {
|
||||||
// determine instant at which packet should get forwarded. this way we minimise effect of
|
// determine instant at which packet should get forwarded. this way we minimise effect of
|
||||||
// being stuck in the queue [of the channel] to get inserted into the delay queue
|
// being stuck in the queue [of the channel] to get inserted into the delay queue
|
||||||
|
let packet_size = PacketSize::get_type(mix_packet.sphinx_packet().len()).unwrap();
|
||||||
|
Span::current().record("packet_size", field::debug(packet_size));
|
||||||
let forward_instant = delay.map(|delay| Instant::now() + delay.to_duration());
|
let forward_instant = delay.map(|delay| Instant::now() + delay.to_duration());
|
||||||
|
|
||||||
// if unbounded_send() failed it means that the receiver channel was disconnected
|
// if unbounded_send() failed it means that the receiver channel was disconnected
|
||||||
@@ -47,7 +52,7 @@ impl ConnectionHandler {
|
|||||||
.unbounded_send((mix_packet, forward_instant))
|
.unbounded_send((mix_packet, forward_instant))
|
||||||
.expect("the delay-forwarder has died!");
|
.expect("the delay-forwarder has died!");
|
||||||
}
|
}
|
||||||
|
#[instrument(level="info", skip_all, "Handling packet",fields(packet_size=?framed_sphinx_packet.packet_size()))]
|
||||||
fn handle_received_packet(&self, framed_sphinx_packet: FramedSphinxPacket) {
|
fn handle_received_packet(&self, framed_sphinx_packet: FramedSphinxPacket) {
|
||||||
//
|
//
|
||||||
// TODO: here be replay attack detection - it will require similar key cache to the one in
|
// TODO: here be replay attack detection - it will require similar key cache to the one in
|
||||||
@@ -69,7 +74,7 @@ impl ConnectionHandler {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#[instrument(level="info", skip_all, "Connection handling", fields(address=%remote))]
|
||||||
pub(crate) async fn handle_connection(
|
pub(crate) async fn handle_connection(
|
||||||
self,
|
self,
|
||||||
conn: TcpStream,
|
conn: TcpStream,
|
||||||
@@ -83,7 +88,7 @@ impl ConnectionHandler {
|
|||||||
tokio::select! {
|
tokio::select! {
|
||||||
biased;
|
biased;
|
||||||
_ = shutdown.recv() => {
|
_ = shutdown.recv() => {
|
||||||
log::trace!("ConnectionHandler: received shutdown");
|
trace!("ConnectionHandler: received shutdown");
|
||||||
}
|
}
|
||||||
Some(framed_sphinx_packet) = framed_conn.next() => {
|
Some(framed_sphinx_packet) = framed_conn.next() => {
|
||||||
match framed_sphinx_packet {
|
match framed_sphinx_packet {
|
||||||
@@ -96,7 +101,9 @@ impl ConnectionHandler {
|
|||||||
// in theory we could process multiple sphinx packet from the same connection in parallel,
|
// in theory we could process multiple sphinx packet from the same connection in parallel,
|
||||||
// but we already handle multiple concurrent connections so if anything, making
|
// but we already handle multiple concurrent connections so if anything, making
|
||||||
// that change would only slow things down
|
// that change would only slow things down
|
||||||
|
//println!("{:?}_In_{:?}", SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_micros(), framed_sphinx_packet.packet_size().size());
|
||||||
self.handle_received_packet(framed_sphinx_packet);
|
self.handle_received_packet(framed_sphinx_packet);
|
||||||
|
//println!("{:?}_Processed_{:?}", SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_micros(), mix_packet.sphinx_packet().len());
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!(
|
error!(
|
||||||
@@ -113,6 +120,6 @@ impl ConnectionHandler {
|
|||||||
"Closing connection from {:?}",
|
"Closing connection from {:?}",
|
||||||
framed_conn.into_inner().peer_addr()
|
framed_conn.into_inner().peer_addr()
|
||||||
);
|
);
|
||||||
log::trace!("ConnectionHandler: Exiting");
|
trace!("ConnectionHandler: Exiting");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,7 +2,8 @@
|
|||||||
// SPDX-License-Identifier: Apache-2.0
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
use crate::node::listener::connection_handler::ConnectionHandler;
|
use crate::node::listener::connection_handler::ConnectionHandler;
|
||||||
use log::error;
|
use tracing::*;
|
||||||
|
use tracing::{error, info, trace, warn};
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::process;
|
use std::process;
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
@@ -21,9 +22,9 @@ impl Listener {
|
|||||||
pub(crate) fn new(address: SocketAddr, shutdown: TaskClient) -> Self {
|
pub(crate) fn new(address: SocketAddr, shutdown: TaskClient) -> Self {
|
||||||
Listener { address, shutdown }
|
Listener { address, shutdown }
|
||||||
}
|
}
|
||||||
|
#[instrument(level="info", skip_all, "Mixnet Listener")]
|
||||||
async fn run(&mut self, connection_handler: ConnectionHandler) {
|
async fn run(&mut self, connection_handler: ConnectionHandler) {
|
||||||
log::trace!("Starting Listener");
|
trace!("Starting Listener");
|
||||||
let listener = match TcpListener::bind(self.address).await {
|
let listener = match TcpListener::bind(self.address).await {
|
||||||
Ok(listener) => listener,
|
Ok(listener) => listener,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
@@ -36,20 +37,20 @@ impl Listener {
|
|||||||
tokio::select! {
|
tokio::select! {
|
||||||
biased;
|
biased;
|
||||||
_ = self.shutdown.recv() => {
|
_ = self.shutdown.recv() => {
|
||||||
log::trace!("Listener: Received shutdown");
|
trace!("Listener: Received shutdown");
|
||||||
}
|
}
|
||||||
connection = listener.accept() => {
|
connection = listener.accept() => {
|
||||||
match connection {
|
match connection {
|
||||||
Ok((socket, remote_addr)) => {
|
Ok((socket, remote_addr)) => {
|
||||||
let handler = connection_handler.clone();
|
let handler = connection_handler.clone();
|
||||||
tokio::spawn(handler.handle_connection(socket, remote_addr, self.shutdown.clone()));
|
tokio::spawn(handler.handle_connection(socket, remote_addr, self.shutdown.clone()).instrument(info_span!("Connection handling", address = %remote_addr)));
|
||||||
}
|
}
|
||||||
Err(err) => warn!("Failed to accept incoming connection - {err}"),
|
Err(err) => warn!("Failed to accept incoming connection - {err}"),
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
log::trace!("Listener: Exiting");
|
trace!("Listener: Exiting");
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn start(mut self, connection_handler: ConnectionHandler) -> JoinHandle<()> {
|
pub(crate) fn start(mut self, connection_handler: ConnectionHandler) -> JoinHandle<()> {
|
||||||
|
|||||||
@@ -17,9 +17,10 @@ use crate::node::listener::Listener;
|
|||||||
use crate::node::node_description::NodeDescription;
|
use crate::node::node_description::NodeDescription;
|
||||||
use crate::node::node_statistics::SharedNodeStats;
|
use crate::node::node_statistics::SharedNodeStats;
|
||||||
use crate::node::packet_delayforwarder::{DelayForwarder, PacketDelayForwardSender};
|
use crate::node::packet_delayforwarder::{DelayForwarder, PacketDelayForwardSender};
|
||||||
|
use tracing::*;
|
||||||
|
use tracing::{info, error, warn};
|
||||||
use crate::OutputFormat;
|
use crate::OutputFormat;
|
||||||
use colored::Colorize;
|
use colored::Colorize;
|
||||||
use log::{error, info, warn};
|
|
||||||
use mixnode_common::verloc::{self, AtomicVerlocResult, VerlocMeasurer};
|
use mixnode_common::verloc::{self, AtomicVerlocResult, VerlocMeasurer};
|
||||||
use nym_bin_common::version_checker::parse_version;
|
use nym_bin_common::version_checker::parse_version;
|
||||||
use nym_config::NymConfig;
|
use nym_config::NymConfig;
|
||||||
@@ -215,7 +216,7 @@ impl MixNode {
|
|||||||
|
|
||||||
let packet_sender = packet_forwarder.sender();
|
let packet_sender = packet_forwarder.sender();
|
||||||
|
|
||||||
tokio::spawn(async move { packet_forwarder.run().await });
|
tokio::spawn(async move { packet_forwarder.run().await }.instrument(info_span!("Packet delay forwarder")));
|
||||||
packet_sender
|
packet_sender
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -294,7 +295,7 @@ impl MixNode {
|
|||||||
|
|
||||||
async fn wait_for_interrupt(&self, shutdown: TaskManager) {
|
async fn wait_for_interrupt(&self, shutdown: TaskManager) {
|
||||||
let _res = shutdown.catch_interrupt().await;
|
let _res = shutdown.catch_interrupt().await;
|
||||||
log::info!("Stopping nym mixnode");
|
info!("Stopping nym mixnode");
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(&mut self) {
|
pub async fn run(&mut self) {
|
||||||
@@ -304,7 +305,7 @@ impl MixNode {
|
|||||||
if duplicate_node_key == self.identity_keypair.public_key().to_base58_string() {
|
if duplicate_node_key == self.identity_keypair.public_key().to_base58_string() {
|
||||||
warn!("You seem to have bonded your mixnode before starting it - that's highly unrecommended as in the future it might result in slashing");
|
warn!("You seem to have bonded your mixnode before starting it - that's highly unrecommended as in the future it might result in slashing");
|
||||||
} else {
|
} else {
|
||||||
log::error!(
|
error!(
|
||||||
"Our announce-host is identical to an existing node's announce-host! (its key is {:?})",
|
"Our announce-host is identical to an existing node's announce-host! (its key is {:?})",
|
||||||
duplicate_node_key
|
duplicate_node_key
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -4,10 +4,13 @@
|
|||||||
use crate::node::node_statistics::UpdateSender;
|
use crate::node::node_statistics::UpdateSender;
|
||||||
use futures::channel::mpsc;
|
use futures::channel::mpsc;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
|
use nym_sphinx::params::packet_sizes::PacketSize;
|
||||||
use nym_nonexhaustive_delayqueue::{Expired, NonExhaustiveDelayQueue};
|
use nym_nonexhaustive_delayqueue::{Expired, NonExhaustiveDelayQueue};
|
||||||
use nym_sphinx::forwarding::packet::MixPacket;
|
use nym_sphinx::forwarding::packet::MixPacket;
|
||||||
use std::io;
|
use std::io;
|
||||||
use tokio::time::Instant;
|
use tokio::time::Instant;
|
||||||
|
use tracing::*;
|
||||||
|
use tracing::{trace};
|
||||||
|
|
||||||
use super::TaskClient;
|
use super::TaskClient;
|
||||||
|
|
||||||
@@ -55,11 +58,13 @@ where
|
|||||||
pub(crate) fn sender(&self) -> PacketDelayForwardSender {
|
pub(crate) fn sender(&self) -> PacketDelayForwardSender {
|
||||||
self.packet_sender.clone()
|
self.packet_sender.clone()
|
||||||
}
|
}
|
||||||
|
#[instrument(level="debug", skip_all, "Forwarding packet", fields(packet_size))]
|
||||||
fn forward_packet(&mut self, packet: MixPacket) {
|
fn forward_packet(&mut self, packet: MixPacket) {
|
||||||
let next_hop = packet.next_hop();
|
let next_hop = packet.next_hop();
|
||||||
let packet_mode = packet.packet_mode();
|
let packet_mode = packet.packet_mode();
|
||||||
let sphinx_packet = packet.into_sphinx_packet();
|
let sphinx_packet = packet.into_sphinx_packet();
|
||||||
|
let packet_size = PacketSize::get_type(sphinx_packet.len()).unwrap();
|
||||||
|
Span::current().record("packet_size", field::debug(packet_size));
|
||||||
|
|
||||||
if let Err(err) =
|
if let Err(err) =
|
||||||
self.mixnet_client
|
self.mixnet_client
|
||||||
@@ -87,7 +92,7 @@ where
|
|||||||
let delayed_packet = packet.into_inner();
|
let delayed_packet = packet.into_inner();
|
||||||
self.forward_packet(delayed_packet)
|
self.forward_packet(delayed_packet)
|
||||||
}
|
}
|
||||||
|
#[instrument(level="debug", skip_all, "Handling packet", fields(packet_size))]
|
||||||
fn handle_new_packet(&mut self, new_packet: (MixPacket, Option<Instant>)) {
|
fn handle_new_packet(&mut self, new_packet: (MixPacket, Option<Instant>)) {
|
||||||
// in case of a zero delay packet, don't bother putting it in the delay queue,
|
// in case of a zero delay packet, don't bother putting it in the delay queue,
|
||||||
// just forward it immediately
|
// just forward it immediately
|
||||||
@@ -105,7 +110,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn run(&mut self) {
|
pub(crate) async fn run(&mut self) {
|
||||||
log::trace!("Starting DelayForwarder");
|
trace!("Starting DelayForwarder");
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
delayed = self.delay_queue.next() => {
|
delayed = self.delay_queue.next() => {
|
||||||
@@ -117,12 +122,12 @@ where
|
|||||||
self.handle_new_packet(new_packet.unwrap())
|
self.handle_new_packet(new_packet.unwrap())
|
||||||
}
|
}
|
||||||
_ = self.shutdown.recv() => {
|
_ = self.shutdown.recv() => {
|
||||||
log::trace!("DelayForwarder: Received shutdown");
|
trace!("DelayForwarder: Received shutdown");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log::trace!("DelayForwarder: Exiting");
|
trace!("DelayForwarder: Exiting");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -127,8 +127,8 @@ impl RewardedSetUpdater {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Reward all the nodes in the still current, soon to be previous rewarded set
|
// Reward all the nodes in the still current, soon to be previous rewarded set
|
||||||
log::info!("Rewarding the current rewarded set...");
|
//log::info!("Rewarding the current rewarded set...");
|
||||||
self.reward_current_rewarded_set(interval).await?;
|
//self.reward_current_rewarded_set(interval).await?;
|
||||||
|
|
||||||
// note: those operations don't really have to be atomic, so it's fine to send them
|
// note: those operations don't really have to be atomic, so it's fine to send them
|
||||||
// as separate transactions
|
// as separate transactions
|
||||||
|
|||||||
@@ -68,7 +68,7 @@ pub async fn get_rewarded_set_detailed(
|
|||||||
#[openapi(tag = "contract-cache")]
|
#[openapi(tag = "contract-cache")]
|
||||||
#[get("/mixnodes/active")]
|
#[get("/mixnodes/active")]
|
||||||
pub async fn get_active_set(cache: &State<NymContractCache>) -> Json<Vec<MixNodeDetails>> {
|
pub async fn get_active_set(cache: &State<NymContractCache>) -> Json<Vec<MixNodeDetails>> {
|
||||||
Json(cache.active_set().await.value)
|
Json(cache.mixnodes_filtered().await)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DEPRECATED: this endpoint now lives in `node_status_api`. Once all consumers are updated,
|
// DEPRECATED: this endpoint now lives in `node_status_api`. Once all consumers are updated,
|
||||||
|
|||||||
Reference in New Issue
Block a user