Compare commits

...

22 Commits

Author SHA1 Message Date
Simon Wicky edfb75b9ef remove test client 2023-03-28 16:12:21 +02:00
Simon Wicky 298591b9e0 Merge commit '009b2131f61dc447ad08b5f1df1e2eef44cde1d2' into simon/instrumented 2023-03-28 13:45:37 +00:00
Simon Wicky 009b2131f6 instrumentation 2023-03-28 13:24:45 +00:00
Simon Wicky e5af0f5d5e Merge commit '3cad2dbb341c23592d22856ced5c8020a7ea20b7' into simon/instrumented 2023-03-28 13:21:46 +00:00
Simon Wicky 3cad2dbb34 Merge remote-tracking branch 'origin/simon/instrumented' into simon/instrumented 2023-03-28 13:18:53 +00:00
Simon Wicky e95e33cd70 Merge commit '6d44fe818ea4c74f476cc6d79434cc8619b45c0c' into simon/instrumented 2023-03-28 13:17:09 +00:00
Simon Wicky 5281895d5b mixnode instrumentation 2023-03-28 12:39:06 +00:00
Simon Wicky 263db0dbc3 Merge commit '6d44fe818ea4c74f476cc6d79434cc8619b45c0c' into Simon/1.1.4 2023-03-28 11:55:06 +00:00
Simon Wicky 6d44fe818e Merge commit '2878e9be9d0c397a746a8c942b818ac1168dff6f' into simon 2023-03-28 11:50:17 +02:00
Simon Wicky b8ec48cf07 small changes to warmup phase 2023-03-28 09:41:35 +00:00
Simon Wicky 99227e837c disable some epoch operations 2023-03-23 07:52:39 +00:00
Simon Wicky 34cb142595 packet size the end 2023-03-22 08:40:33 +00:00
Simon Wicky 2988ae4459 packet size in config 2023-03-10 14:16:53 +00:00
Simon Wicky e653b632ba packet sizes support 2023-03-10 13:18:19 +00:00
Simon Wicky eb216a06b3 redirect mixnodes/active to mixnodes 2023-03-10 13:18:15 +00:00
Simon Wicky 921f01a789 Merge commit 'f4d0a120bb7f552591c415ece31d9f75bd6ec33e' into Simon/1.1.5 2023-01-31 09:31:24 +00:00
Simon Wicky 27bf8b2e00 tracing first part 2023-01-31 09:25:15 +00:00
Simon Wicky e82a669cd3 merge 1.1.5 2023-01-11 15:17:15 +00:00
Simon Wicky 2015443a90 set default packetsize back to 2 2023-01-10 10:23:51 +00:00
Simon Wicky 2c2823f9e1 test instrumentation 2022-12-21 08:31:16 +00:00
Simon Wicky 52b41a5697 add packet sizes support 2022-12-21 08:06:23 +00:00
Simon Wicky 9ee6ae44e2 remove override config, and active nodes 2022-12-21 08:06:13 +00:00
32 changed files with 1045 additions and 547 deletions
Generated
+731 -473
View File
File diff suppressed because it is too large Load Diff
@@ -333,7 +333,7 @@ where
fn poll_poisson(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> { fn poll_poisson(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
// The average delay could change depending on if backpressure in the downstream channel // The average delay could change depending on if backpressure in the downstream channel
// (mix_tx) was detected. // (mix_tx) was detected.
self.adjust_current_average_message_sending_delay(); //self.adjust_current_average_message_sending_delay();
let avg_delay = self.current_average_message_sending_delay(); let avg_delay = self.current_average_message_sending_delay();
// Start by checking if we have any incoming messages about closed connections // Start by checking if we have any incoming messages about closed connections
+22
View File
@@ -697,6 +697,17 @@ pub enum ExtendedPacketSize {
Extended8, Extended8,
Extended16, Extended16,
Extended32, Extended32,
Extended10,
Extended15,
Extended20,
Extended25,
Extended50,
Extended100,
Extended150,
Extended200,
Extended250,
Extended500,
} }
impl Default for DebugConfig { impl Default for DebugConfig {
@@ -734,6 +745,17 @@ impl From<ExtendedPacketSize> for PacketSize {
ExtendedPacketSize::Extended8 => PacketSize::ExtendedPacket8, ExtendedPacketSize::Extended8 => PacketSize::ExtendedPacket8,
ExtendedPacketSize::Extended16 => PacketSize::ExtendedPacket16, ExtendedPacketSize::Extended16 => PacketSize::ExtendedPacket16,
ExtendedPacketSize::Extended32 => PacketSize::ExtendedPacket32, ExtendedPacketSize::Extended32 => PacketSize::ExtendedPacket32,
ExtendedPacketSize::Extended10 => PacketSize::ExtendedPacket10,
ExtendedPacketSize::Extended15 => PacketSize::ExtendedPacket15,
ExtendedPacketSize::Extended20 => PacketSize::ExtendedPacket20,
ExtendedPacketSize::Extended25 => PacketSize::ExtendedPacket25,
ExtendedPacketSize::Extended50 => PacketSize::ExtendedPacket50,
ExtendedPacketSize::Extended100 => PacketSize::ExtendedPacket100,
ExtendedPacketSize::Extended150 => PacketSize::ExtendedPacket150,
ExtendedPacketSize::Extended200 => PacketSize::ExtendedPacket200,
ExtendedPacketSize::Extended250 => PacketSize::ExtendedPacket250,
ExtendedPacketSize::Extended500 => PacketSize::ExtendedPacket500,
} }
} }
} }
@@ -74,7 +74,7 @@ impl PacketRouter {
received_messages.push(received_packet); received_messages.push(received_packet);
} else { } else {
// this can happen if other clients are not padding their messages // this can happen if other clients are not padding their messages
warn!("Received message of unexpected size. Probably from an outdated client... len: {}", received_packet.len()); //warn!("Received message of unexpected size. Probably from an outdated client... len: {}", received_packet.len());
received_messages.push(received_packet); received_messages.push(received_packet);
} }
} }
@@ -8,6 +8,7 @@ edition = "2021"
[dependencies] [dependencies]
futures = "0.3" futures = "0.3"
tracing = "0.1.37"
log = { workspace = true } log = { workspace = true }
tokio = { version = "1.24.1", features = ["time", "net", "rt"] } tokio = { version = "1.24.1", features = ["time", "net", "rt"] }
tokio-util = { version = "0.7.4", features = ["codec"] } tokio-util = { version = "0.7.4", features = ["codec"] }
@@ -3,11 +3,12 @@
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::StreamExt; use futures::StreamExt;
use log::*; use tracing::*;
use nym_sphinx::framing::codec::SphinxCodec; use nym_sphinx::framing::codec::SphinxCodec;
use nym_sphinx::framing::packet::FramedSphinxPacket; use nym_sphinx::framing::packet::FramedSphinxPacket;
use nym_sphinx::params::PacketMode; use nym_sphinx::params::PacketMode;
use nym_sphinx::{addressing::nodes::NymNodeRoutingAddress, SphinxPacket}; use nym_sphinx::{addressing::nodes::NymNodeRoutingAddress, SphinxPacket};
use nym_sphinx::params::packet_sizes::PacketSize;
use std::collections::HashMap; use std::collections::HashMap;
use std::io; use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
@@ -197,6 +198,7 @@ impl Client {
} }
impl SendWithoutResponse for Client { impl SendWithoutResponse for Client {
#[instrument(level="info", skip(self, packet), "Sending packet to mixnet", fields(packet_size))]
fn send_without_response( fn send_without_response(
&mut self, &mut self,
address: NymNodeRoutingAddress, address: NymNodeRoutingAddress,
@@ -204,13 +206,15 @@ impl SendWithoutResponse for Client {
packet_mode: PacketMode, packet_mode: PacketMode,
) -> io::Result<()> { ) -> io::Result<()> {
trace!("Sending packet to {:?}", address); trace!("Sending packet to {:?}", address);
let packet_size = PacketSize::get_type(packet.len()).unwrap();
Span::current().record("packet_size", field::debug(packet_size));
let framed_packet = let framed_packet =
FramedSphinxPacket::new(packet, packet_mode, self.config.use_legacy_version); FramedSphinxPacket::new(packet, packet_mode, self.config.use_legacy_version);
if let Some(sender) = self.conn_new.get_mut(&address) { if let Some(sender) = self.conn_new.get_mut(&address) {
if let Err(err) = sender.channel.try_send(framed_packet) { if let Err(err) = sender.channel.try_send(framed_packet) {
if err.is_full() { if err.is_full() {
debug!("Connection to {} seems to not be able to handle all the traffic - dropping the current packet", address); info!("Connection to {} seems to not be able to handle all the traffic - dropping the current packet", address);
// it's not a 'big' error, but we did not manage to send the packet // it's not a 'big' error, but we did not manage to send the packet
// if the queue is full, we can't really do anything but to drop the packet // if the queue is full, we can't really do anything but to drop the packet
Err(io::Error::new( Err(io::Error::new(
@@ -4,8 +4,8 @@
use crate::client::{Client, Config, SendWithoutResponse}; use crate::client::{Client, Config, SendWithoutResponse};
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::StreamExt; use futures::StreamExt;
use log::*;
use nym_sphinx::forwarding::packet::MixPacket; use nym_sphinx::forwarding::packet::MixPacket;
use tracing::*;
use std::time::Duration; use std::time::Duration;
pub type MixForwardingSender = mpsc::UnboundedSender<MixPacket>; pub type MixForwardingSender = mpsc::UnboundedSender<MixPacket>;
@@ -53,10 +53,10 @@ impl PacketForwarder {
tokio::select! { tokio::select! {
biased; biased;
_ = self.shutdown.recv() => { _ = self.shutdown.recv() => {
log::trace!("PacketForwarder: Received shutdown"); trace!("PacketForwarder: Received shutdown");
} }
Some(mix_packet) = self.packet_receiver.next() => { Some(mix_packet) = self.packet_receiver.next() => {
trace!("Going to forward packet to {:?}", mix_packet.next_hop()); trace!("Going to forward packet to {:?}", mix_packet.next_hop());
let next_hop = mix_packet.next_hop(); let next_hop = mix_packet.next_hop();
let packet_mode = mix_packet.packet_mode(); let packet_mode = mix_packet.packet_mode();
+1
View File
@@ -16,6 +16,7 @@ serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.24.1", features = ["time", "macros", "rt", "net", "io-util"] } tokio = { version = "1.24.1", features = ["time", "macros", "rt", "net", "io-util"] }
tokio-util = { version = "0.7.4", features = ["codec"] } tokio-util = { version = "0.7.4", features = ["codec"] }
url = "2.2" url = "2.2"
tracing = "0.1.37"
thiserror = "1.0.37" thiserror = "1.0.37"
nym-crypto = { path = "../crypto" } nym-crypto = { path = "../crypto" }
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use crate::packet_processor::error::MixProcessingError; use crate::packet_processor::error::MixProcessingError;
use log::*; use tracing::*;
use nym_sphinx_acknowledgements::surb_ack::SurbAck; use nym_sphinx_acknowledgements::surb_ack::SurbAck;
use nym_sphinx_addressing::nodes::NymNodeRoutingAddress; use nym_sphinx_addressing::nodes::NymNodeRoutingAddress;
use nym_sphinx_forwarding::packet::MixPacket; use nym_sphinx_forwarding::packet::MixPacket;
@@ -57,6 +57,7 @@ impl SphinxPacketProcessor {
} }
/// Takes the received framed packet and tries to unwrap it from the sphinx encryption. /// Takes the received framed packet and tries to unwrap it from the sphinx encryption.
#[instrument(level="debug", skip_all)]
fn perform_initial_unwrapping( fn perform_initial_unwrapping(
&self, &self,
received: FramedSphinxPacket, received: FramedSphinxPacket,
@@ -73,6 +74,7 @@ impl SphinxPacketProcessor {
/// Processed received forward hop packet - tries to extract next hop address, sets delay /// Processed received forward hop packet - tries to extract next hop address, sets delay
/// and packs all the data in a way that can be easily sent to the next hop. /// and packs all the data in a way that can be easily sent to the next hop.
#[instrument(level="debug", skip_all)]
fn process_forward_hop( fn process_forward_hop(
&self, &self,
packet: SphinxPacket, packet: SphinxPacket,
@@ -88,6 +90,7 @@ impl SphinxPacketProcessor {
/// Split data extracted from the final hop sphinx packet into a SURBAck and message /// Split data extracted from the final hop sphinx packet into a SURBAck and message
/// that should get delivered to a client. /// that should get delivered to a client.
#[instrument(level="debug", skip_all)]
fn split_hop_data_into_ack_and_message( fn split_hop_data_into_ack_and_message(
&self, &self,
mut extracted_data: Vec<u8>, mut extracted_data: Vec<u8>,
@@ -105,6 +108,7 @@ impl SphinxPacketProcessor {
/// Tries to extract a SURBAck that could be sent back into the mix network and message /// Tries to extract a SURBAck that could be sent back into the mix network and message
/// that should get delivered to a client from received Sphinx packet. /// that should get delivered to a client from received Sphinx packet.
#[instrument(level="debug", skip_all)]
fn split_into_ack_and_message( fn split_into_ack_and_message(
&self, &self,
data: Vec<u8>, data: Vec<u8>,
@@ -116,10 +120,7 @@ impl SphinxPacketProcessor {
trace!("received an ack packet!"); trace!("received an ack packet!");
Ok((None, data)) Ok((None, data))
} }
PacketSize::RegularPacket _ => {
| PacketSize::ExtendedPacket8
| PacketSize::ExtendedPacket16
| PacketSize::ExtendedPacket32 => {
trace!("received a normal packet!"); trace!("received a normal packet!");
let (ack_data, message) = self.split_hop_data_into_ack_and_message(data)?; let (ack_data, message) = self.split_hop_data_into_ack_and_message(data)?;
let (ack_first_hop, ack_packet) = SurbAck::try_recover_first_hop_packet(&ack_data)?; let (ack_first_hop, ack_packet) = SurbAck::try_recover_first_hop_packet(&ack_data)?;
@@ -132,6 +133,7 @@ impl SphinxPacketProcessor {
/// Processed received final hop packet - tries to extract SURBAck out of it (assuming the /// Processed received final hop packet - tries to extract SURBAck out of it (assuming the
/// packet itself is not an ACK) and splits it from the message that should get delivered /// packet itself is not an ACK) and splits it from the message that should get delivered
/// to the destination. /// to the destination.
#[instrument(level="debug", skip_all)]
fn process_final_hop( fn process_final_hop(
&self, &self,
destination: DestinationAddressBytes, destination: DestinationAddressBytes,
@@ -153,6 +155,7 @@ impl SphinxPacketProcessor {
/// Performs final processing for the unwrapped packet based on whether it was a forward hop /// Performs final processing for the unwrapped packet based on whether it was a forward hop
/// or a final hop. /// or a final hop.
#[instrument(level="debug", skip_all)]
fn perform_final_processing( fn perform_final_processing(
&self, &self,
packet: ProcessedPacket, packet: ProcessedPacket,
@@ -170,7 +173,7 @@ impl SphinxPacketProcessor {
} }
} }
} }
#[instrument(level="debug", skip_all, fields(packet_size=?received.packet_size()))]
pub fn process_received( pub fn process_received(
&self, &self,
received: FramedSphinxPacket, received: FramedSphinxPacket,
+83 -1
View File
@@ -20,6 +20,16 @@ const ACK_PACKET_SIZE: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + ACK_IV_SIZE
const EXTENDED_PACKET_SIZE_8: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 8 * 1024; const EXTENDED_PACKET_SIZE_8: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 8 * 1024;
const EXTENDED_PACKET_SIZE_16: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 16 * 1024; const EXTENDED_PACKET_SIZE_16: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 16 * 1024;
const EXTENDED_PACKET_SIZE_32: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 32 * 1024; const EXTENDED_PACKET_SIZE_32: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 32 * 1024;
const EXTENDED_PACKET_SIZE_10: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 10 * 1024;
const EXTENDED_PACKET_SIZE_15: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 15 * 1024;
const EXTENDED_PACKET_SIZE_20: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 20 * 1024;
const EXTENDED_PACKET_SIZE_25: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 25 * 1024;
const EXTENDED_PACKET_SIZE_50: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 50 * 1024;
const EXTENDED_PACKET_SIZE_100: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 100 * 1024;
const EXTENDED_PACKET_SIZE_150: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 150 * 1024;
const EXTENDED_PACKET_SIZE_200: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 200 * 1024;
const EXTENDED_PACKET_SIZE_250: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 250 * 1024;
const EXTENDED_PACKET_SIZE_500: usize = HEADER_SIZE + PAYLOAD_OVERHEAD_SIZE + 500 * 1024;
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum InvalidPacketSize { pub enum InvalidPacketSize {
@@ -51,6 +61,18 @@ pub enum PacketSize {
// for example for streaming fast and furious in compressed XviD quality // for example for streaming fast and furious in compressed XviD quality
ExtendedPacket16 = 5, ExtendedPacket16 = 5,
ExtendedPacket10 = 6,
ExtendedPacket15 = 7,
ExtendedPacket20 = 8,
ExtendedPacket25 = 9,
ExtendedPacket50 = 10,
ExtendedPacket100 = 11,
ExtendedPacket150 = 12,
ExtendedPacket200 = 13,
ExtendedPacket250 = 14,
ExtendedPacket500 = 15,
} }
impl FromStr for PacketSize { impl FromStr for PacketSize {
@@ -63,6 +85,16 @@ impl FromStr for PacketSize {
"extended8" => Ok(Self::ExtendedPacket8), "extended8" => Ok(Self::ExtendedPacket8),
"extended16" => Ok(Self::ExtendedPacket16), "extended16" => Ok(Self::ExtendedPacket16),
"extended32" => Ok(Self::ExtendedPacket32), "extended32" => Ok(Self::ExtendedPacket32),
"extended10" => Ok(Self::ExtendedPacket10),
"extended15" => Ok(Self::ExtendedPacket15),
"extended20" => Ok(Self::ExtendedPacket20),
"extended25" => Ok(Self::ExtendedPacket25),
"extended50" => Ok(Self::ExtendedPacket50),
"extended100" => Ok(Self::ExtendedPacket100),
"extended150" => Ok(Self::ExtendedPacket150),
"extended200" => Ok(Self::ExtendedPacket200),
"extended250" => Ok(Self::ExtendedPacket250),
"extended500" => Ok(Self::ExtendedPacket500),
s => Err(InvalidPacketSize::UnknownExtendedPacketVariant { s => Err(InvalidPacketSize::UnknownExtendedPacketVariant {
received: s.to_string(), received: s.to_string(),
}), }),
@@ -80,6 +112,16 @@ impl TryFrom<u8> for PacketSize {
_ if value == (PacketSize::ExtendedPacket8 as u8) => Ok(Self::ExtendedPacket8), _ if value == (PacketSize::ExtendedPacket8 as u8) => Ok(Self::ExtendedPacket8),
_ if value == (PacketSize::ExtendedPacket16 as u8) => Ok(Self::ExtendedPacket16), _ if value == (PacketSize::ExtendedPacket16 as u8) => Ok(Self::ExtendedPacket16),
_ if value == (PacketSize::ExtendedPacket32 as u8) => Ok(Self::ExtendedPacket32), _ if value == (PacketSize::ExtendedPacket32 as u8) => Ok(Self::ExtendedPacket32),
_ if value == (PacketSize::ExtendedPacket10 as u8) => Ok(Self::ExtendedPacket10),
_ if value == (PacketSize::ExtendedPacket15 as u8) => Ok(Self::ExtendedPacket15),
_ if value == (PacketSize::ExtendedPacket20 as u8) => Ok(Self::ExtendedPacket20),
_ if value == (PacketSize::ExtendedPacket25 as u8) => Ok(Self::ExtendedPacket25),
_ if value == (PacketSize::ExtendedPacket50 as u8) => Ok(Self::ExtendedPacket50),
_ if value == (PacketSize::ExtendedPacket100 as u8) => Ok(Self::ExtendedPacket100),
_ if value == (PacketSize::ExtendedPacket150 as u8) => Ok(Self::ExtendedPacket150),
_ if value == (PacketSize::ExtendedPacket200 as u8) => Ok(Self::ExtendedPacket200),
_ if value == (PacketSize::ExtendedPacket250 as u8) => Ok(Self::ExtendedPacket250),
_ if value == (PacketSize::ExtendedPacket500 as u8) => Ok(Self::ExtendedPacket500),
v => Err(InvalidPacketSize::UnknownPacketTag { received: v }), v => Err(InvalidPacketSize::UnknownPacketTag { received: v }),
} }
} }
@@ -93,6 +135,16 @@ impl PacketSize {
PacketSize::ExtendedPacket8 => EXTENDED_PACKET_SIZE_8, PacketSize::ExtendedPacket8 => EXTENDED_PACKET_SIZE_8,
PacketSize::ExtendedPacket16 => EXTENDED_PACKET_SIZE_16, PacketSize::ExtendedPacket16 => EXTENDED_PACKET_SIZE_16,
PacketSize::ExtendedPacket32 => EXTENDED_PACKET_SIZE_32, PacketSize::ExtendedPacket32 => EXTENDED_PACKET_SIZE_32,
PacketSize::ExtendedPacket10 => EXTENDED_PACKET_SIZE_10,
PacketSize::ExtendedPacket15 => EXTENDED_PACKET_SIZE_15,
PacketSize::ExtendedPacket20 => EXTENDED_PACKET_SIZE_20,
PacketSize::ExtendedPacket25 => EXTENDED_PACKET_SIZE_25,
PacketSize::ExtendedPacket50 => EXTENDED_PACKET_SIZE_50,
PacketSize::ExtendedPacket100 => EXTENDED_PACKET_SIZE_100,
PacketSize::ExtendedPacket150 => EXTENDED_PACKET_SIZE_150,
PacketSize::ExtendedPacket200 => EXTENDED_PACKET_SIZE_200,
PacketSize::ExtendedPacket250 => EXTENDED_PACKET_SIZE_250,
PacketSize::ExtendedPacket500 => EXTENDED_PACKET_SIZE_500,
} }
} }
@@ -115,6 +167,26 @@ impl PacketSize {
Ok(PacketSize::ExtendedPacket16) Ok(PacketSize::ExtendedPacket16)
} else if PacketSize::ExtendedPacket32.size() == size { } else if PacketSize::ExtendedPacket32.size() == size {
Ok(PacketSize::ExtendedPacket32) Ok(PacketSize::ExtendedPacket32)
} else if PacketSize::ExtendedPacket10.size() == size {
Ok(PacketSize::ExtendedPacket10)
} else if PacketSize::ExtendedPacket15.size() == size {
Ok(PacketSize::ExtendedPacket15)
} else if PacketSize::ExtendedPacket20.size() == size {
Ok(PacketSize::ExtendedPacket20)
} else if PacketSize::ExtendedPacket25.size() == size {
Ok(PacketSize::ExtendedPacket25)
} else if PacketSize::ExtendedPacket50.size() == size {
Ok(PacketSize::ExtendedPacket50)
} else if PacketSize::ExtendedPacket100.size() == size {
Ok(PacketSize::ExtendedPacket100)
} else if PacketSize::ExtendedPacket150.size() == size {
Ok(PacketSize::ExtendedPacket150)
} else if PacketSize::ExtendedPacket200.size() == size {
Ok(PacketSize::ExtendedPacket200)
} else if PacketSize::ExtendedPacket250.size() == size {
Ok(PacketSize::ExtendedPacket250)
} else if PacketSize::ExtendedPacket500.size() == size {
Ok(PacketSize::ExtendedPacket500)
} else { } else {
Err(InvalidPacketSize::UnknownPacketSize { received: size }) Err(InvalidPacketSize::UnknownPacketSize { received: size })
} }
@@ -125,7 +197,17 @@ impl PacketSize {
PacketSize::RegularPacket | PacketSize::AckPacket => false, PacketSize::RegularPacket | PacketSize::AckPacket => false,
PacketSize::ExtendedPacket8 PacketSize::ExtendedPacket8
| PacketSize::ExtendedPacket16 | PacketSize::ExtendedPacket16
| PacketSize::ExtendedPacket32 => true, | PacketSize::ExtendedPacket32
| PacketSize::ExtendedPacket10
| PacketSize::ExtendedPacket15
| PacketSize::ExtendedPacket20
| PacketSize::ExtendedPacket25
| PacketSize::ExtendedPacket50
| PacketSize::ExtendedPacket100
| PacketSize::ExtendedPacket150
| PacketSize::ExtendedPacket200
| PacketSize::ExtendedPacket250
| PacketSize::ExtendedPacket500 => true,
} }
} }
@@ -47,6 +47,16 @@ impl From<u8> for PacketVersion {
n if n == PacketSize::ExtendedPacket8 as u8 => PacketVersion::Legacy, n if n == PacketSize::ExtendedPacket8 as u8 => PacketVersion::Legacy,
n if n == PacketSize::ExtendedPacket16 as u8 => PacketVersion::Legacy, n if n == PacketSize::ExtendedPacket16 as u8 => PacketVersion::Legacy,
n if n == PacketSize::ExtendedPacket32 as u8 => PacketVersion::Legacy, n if n == PacketSize::ExtendedPacket32 as u8 => PacketVersion::Legacy,
n if n == PacketSize::ExtendedPacket10 as u8 => PacketVersion::Legacy,
n if n == PacketSize::ExtendedPacket15 as u8 => PacketVersion::Legacy,
n if n == PacketSize::ExtendedPacket20 as u8 => PacketVersion::Legacy,
n if n == PacketSize::ExtendedPacket25 as u8 => PacketVersion::Legacy,
n if n == PacketSize::ExtendedPacket50 as u8 => PacketVersion::Legacy,
n if n == PacketSize::ExtendedPacket100 as u8 => PacketVersion::Legacy,
n if n == PacketSize::ExtendedPacket150 as u8 => PacketVersion::Legacy,
n if n == PacketSize::ExtendedPacket200 as u8 => PacketVersion::Legacy,
n if n == PacketSize::ExtendedPacket250 as u8 => PacketVersion::Legacy,
n if n == PacketSize::ExtendedPacket500 as u8 => PacketVersion::Legacy,
n => PacketVersion::Versioned(n), n => PacketVersion::Versioned(n),
} }
} }
+6
View File
@@ -50,6 +50,12 @@ tokio-stream = { version = "0.1.11", features = ["fs"] }
tokio-tungstenite = "0.14" tokio-tungstenite = "0.14"
tokio-util = { version = "0.7.4", features = ["codec"] } tokio-util = { version = "0.7.4", features = ["codec"] }
url = { version = "2.2", features = ["serde"] } url = { version = "2.2", features = ["serde"] }
tracing = "0.1.37"
tracing-subscriber = {version = "0.3.16", features = ["registry", "std", "env-filter"]}
tracing-opentelemetry = "0.18.0"
opentelemetry-jaeger = {version = "0.17.0", features = ["collector_client","rt-tokio","isahc_collector_client"]}
opentelemetry = {version = "0.18.0", features = ["rt-tokio"]}
tracing-flame = "0.2.0"
# internal # internal
nym-coconut-interface = { path = "../common/coconut-interface" } nym-coconut-interface = { path = "../common/coconut-interface" }
+1
View File
@@ -18,6 +18,7 @@ rand = { version = "0.7.3", features = ["wasm-bindgen"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
thiserror = "1.0" thiserror = "1.0"
tracing = "0.1.29"
nym-crypto = { path = "../../common/crypto" } nym-crypto = { path = "../../common/crypto" }
nym-pemstore = { path = "../../common/pemstore" } nym-pemstore = { path = "../../common/pemstore" }
@@ -6,7 +6,6 @@ use crate::registration::handshake::shared_key::{SharedKeySize, SharedKeys};
use crate::registration::handshake::WsItem; use crate::registration::handshake::WsItem;
use crate::types; use crate::types;
use futures::{Sink, SinkExt, Stream, StreamExt}; use futures::{Sink, SinkExt, Stream, StreamExt};
use log::*;
use nym_crypto::{ use nym_crypto::{
asymmetric::{encryption, identity}, asymmetric::{encryption, identity},
generic_array::typenum::Unsigned, generic_array::typenum::Unsigned,
@@ -14,6 +13,7 @@ use nym_crypto::{
symmetric::stream_cipher, symmetric::stream_cipher,
}; };
use nym_sphinx::params::{GatewayEncryptionAlgorithm, GatewaySharedKeyHkdfAlgorithm}; use nym_sphinx::params::{GatewayEncryptionAlgorithm, GatewaySharedKeyHkdfAlgorithm};
use tracing::*;
use rand::{CryptoRng, RngCore}; use rand::{CryptoRng, RngCore};
use std::convert::{TryFrom, TryInto}; use std::convert::{TryFrom, TryInto};
use tungstenite::Message as WsMessage; use tungstenite::Message as WsMessage;
+5 -3
View File
@@ -7,6 +7,8 @@ use crate::{
OutputFormat, OutputFormat,
}; };
use clap::Args; use clap::Args;
use config::NymConfig;
use tracing::*;
use std::error::Error; use std::error::Error;
use std::net::IpAddr; use std::net::IpAddr;
use std::path::PathBuf; use std::path::PathBuf;
@@ -115,19 +117,19 @@ fn special_addresses() -> Vec<&'static str> {
pub async fn execute(args: Run, output: OutputFormat) -> Result<(), Box<dyn Error + Send + Sync>> { pub async fn execute(args: Run, output: OutputFormat) -> Result<(), Box<dyn Error + Send + Sync>> {
let id = args.id.clone(); let id = args.id.clone();
println!("Starting gateway {id}..."); println!("Starting gateway {id}...");
event!(Level::INFO, "Loading config");
let config = build_config(id, args)?; let config = build_config(id, args)?;
ensure_config_version_compatibility(&config)?; ensure_config_version_compatibility(&config)?;
if special_addresses().contains(&&*config.get_listening_address().to_string()) { if special_addresses().contains(&&*config.get_listening_address().to_string()) {
show_binding_warning(config.get_listening_address().to_string()); show_binding_warning(config.get_listening_address().to_string());
} }
event!(Level::INFO, "Creating Gateway node");
let mut gateway = crate::node::create_gateway(config).await; let mut gateway = crate::node::create_gateway(config).await;
eprintln!( eprintln!(
"\nTo bond your gateway you will need to install the Nym wallet, go to https://nymtech.net/get-involved and select the Download button.\n\ "\nTo bond your gateway you will need to install the Nym wallet, go to https://nymtech.net/get-involved and select the Download button.\n\
Select the correct version and install it to your machine. You will need to provide the following: \n "); Select the correct version and install it to your machine. You will need to provide the following: \n ");
gateway.print_node_details(output)?; gateway.print_node_details(output)?;
//.instrument(info_span!("Gateway run"))
gateway.run().await gateway.run().await
} }
+40 -4
View File
@@ -4,11 +4,18 @@
use clap::{crate_name, crate_version, Parser, ValueEnum}; use clap::{crate_name, crate_version, Parser, ValueEnum};
use colored::Colorize; use colored::Colorize;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use log::error;
use nym_bin_common::logging::setup_logging; use nym_bin_common::logging::setup_logging;
use nym_bin_common::{build_information::BinaryBuildInformation, logging::banner}; use nym_bin_common::{build_information::BinaryBuildInformation, logging::banner};
use nym_network_defaults::setup_env; use nym_network_defaults::setup_env;
use std::error::Error; use std::error::Error;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::{EnvFilter, Registry, filter};
use tracing::*;
use tracing_flame::FlameLayer;
use std::time::Duration;
use std::thread::sleep;
mod commands; mod commands;
mod config; mod config;
@@ -64,14 +71,39 @@ impl Cli {
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> { async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
setup_logging(); let tracer = opentelemetry_jaeger::new_agent_pipeline()
.with_endpoint("143.42.21.138:6831")
.with_service_name("nym_gateway")
.with_auto_split_batch(true)
.install_batch(opentelemetry::runtime::Tokio)
.expect("Failed to initialize tracer");
//let jaeger_layer = tracing_opentelemetry::layer().with_tracer(tracer);
let hyper_filter = filter::filter_fn(|metadata| {!metadata.target().starts_with("hyper")});
let tokio_filter = filter::filter_fn(|metadata| {!metadata.target().starts_with("tokio")});
let (flame_layer, _guard) = FlameLayer::with_file("./tracing.folded").unwrap();
let subscriber = Registry::default()
.with(EnvFilter::from_default_env())
.with(hyper_filter)
.with(tokio_filter)
.with(tracing_subscriber::fmt::layer().pretty())
.with(flame_layer);
//.with(jaeger_layer)
// tracing::subscriber::set_global_default(subscriber)
// .expect("Failed to set global subscriber");
//tracing_subscriber::fmt::init();
//setup_logging();
if atty::is(atty::Stream::Stdout) { if atty::is(atty::Stream::Stdout) {
println!("{}", banner(crate_name!(), crate_version!())); println!("{}", banner(crate_name!(), crate_version!()));
} }
let args = Cli::parse(); let args = Cli::parse();
setup_env(args.config_env_file.as_ref()); setup_env(args.config_env_file.as_ref());
//.instrument(info_span!("Execute Run"))
commands::execute(args).await.map_err(|err| { commands::execute(args).await.map_err(|err| {
if atty::is(atty::Stream::Stdout) { if atty::is(atty::Stream::Stdout) {
let error_message = format!("{err}").red(); let error_message = format!("{err}").red();
@@ -79,7 +111,11 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
error!("Exiting..."); error!("Exiting...");
} }
err err
}) });
//sleep(Duration::from_secs(5));
//opentelemetry::global::shutdown_tracer_provider();
Ok(())
} }
#[cfg(test)] #[cfg(test)]
@@ -9,8 +9,8 @@ use futures::StreamExt;
use gateway_requests::iv::IVConversionError; use gateway_requests::iv::IVConversionError;
use gateway_requests::types::{BinaryRequest, ServerResponse}; use gateway_requests::types::{BinaryRequest, ServerResponse};
use gateway_requests::{ClientControlRequest, GatewayRequestsError}; use gateway_requests::{ClientControlRequest, GatewayRequestsError};
use log::*;
use nym_sphinx::forwarding::packet::MixPacket; use nym_sphinx::forwarding::packet::MixPacket;
use tracing::*;
use rand::{CryptoRng, Rng}; use rand::{CryptoRng, Rng};
use std::convert::TryFrom; use std::convert::TryFrom;
use std::process; use std::process;
@@ -132,6 +132,7 @@ where
} }
/// Explicitly removes handle from the global store. /// Explicitly removes handle from the global store.
#[instrument(level="debug", skip_all)]
fn disconnect(self) { fn disconnect(self) {
self.inner self.inner
.active_clients_store .active_clients_store
@@ -180,6 +181,7 @@ where
/// # Arguments /// # Arguments
/// ///
/// * `mix_packet`: packet received from the client that should get forwarded into the network. /// * `mix_packet`: packet received from the client that should get forwarded into the network.
#[instrument(level="debug", skip_all)]
fn forward_packet(&self, mix_packet: MixPacket) { fn forward_packet(&self, mix_packet: MixPacket) {
if let Err(err) = self.inner.outbound_mix_sender.unbounded_send(mix_packet) { if let Err(err) = self.inner.outbound_mix_sender.unbounded_send(mix_packet) {
error!("We failed to forward requested mix packet - {err}. Presumably our mix forwarder has crashed. We cannot continue."); error!("We failed to forward requested mix packet - {err}. Presumably our mix forwarder has crashed. We cannot continue.");
@@ -279,13 +281,14 @@ where
/// # Arguments /// # Arguments
/// ///
/// * `mix_packet`: packet received from the client that should get forwarded into the network. /// * `mix_packet`: packet received from the client that should get forwarded into the network.
#[instrument(level="debug", skip_all)]
async fn handle_forward_sphinx( async fn handle_forward_sphinx(
&self, &self,
mix_packet: MixPacket, mix_packet: MixPacket,
) -> Result<ServerResponse, RequestHandlingError> { ) -> Result<ServerResponse, RequestHandlingError> {
let consumed_bandwidth = mix_packet.sphinx_packet().len() as i64; let consumed_bandwidth = mix_packet.sphinx_packet().len() as i64;
let available_bandwidth = self.get_available_bandwidth().await?; let available_bandwidth = self.get_available_bandwidth().instrument(trace_span!("Get available bandwidth")).await?;
if available_bandwidth < consumed_bandwidth { if available_bandwidth < consumed_bandwidth {
return Ok(ServerResponse::new_error( return Ok(ServerResponse::new_error(
@@ -293,7 +296,7 @@ where
)); ));
} }
self.consume_bandwidth(consumed_bandwidth).await?; self.consume_bandwidth(consumed_bandwidth).instrument(trace_span!("Consume bandwidth")).await?;
self.forward_packet(mix_packet); self.forward_packet(mix_packet);
Ok(ServerResponse::Send { Ok(ServerResponse::Send {
@@ -306,6 +309,7 @@ where
/// # Arguments /// # Arguments
/// ///
/// * `bin_msg`: raw message to handle. /// * `bin_msg`: raw message to handle.
#[instrument(level="debug", skip_all)]
async fn handle_binary(&self, bin_msg: Vec<u8>) -> Message { async fn handle_binary(&self, bin_msg: Vec<u8>) -> Message {
// this function decrypts the request and checks the MAC // this function decrypts the request and checks the MAC
match BinaryRequest::try_from_encrypted_tagged_bytes(bin_msg, &self.client.shared_keys) { match BinaryRequest::try_from_encrypted_tagged_bytes(bin_msg, &self.client.shared_keys) {
@@ -327,6 +331,7 @@ where
/// # Arguments /// # Arguments
/// ///
/// * `raw_request`: raw message to handle. /// * `raw_request`: raw message to handle.
#[instrument(level="debug", skip_all)]
async fn handle_text(&mut self, raw_request: String) -> Message { async fn handle_text(&mut self, raw_request: String) -> Message {
match ClientControlRequest::try_from(raw_request) { match ClientControlRequest::try_from(raw_request) {
Err(e) => RequestHandlingError::InvalidTextRequest(e).into_error_message(), Err(e) => RequestHandlingError::InvalidTextRequest(e).into_error_message(),
@@ -349,6 +354,7 @@ where
/// # Arguments /// # Arguments
/// ///
/// * `raw_request`: raw received websocket message. /// * `raw_request`: raw received websocket message.
//#[instrument(level="info", skip_all)]
async fn handle_request(&mut self, raw_request: Message) -> Option<Message> { async fn handle_request(&mut self, raw_request: Message) -> Option<Message> {
// apparently tungstenite auto-handles ping/pong/close messages so for now let's ignore // apparently tungstenite auto-handles ping/pong/close messages so for now let's ignore
// them and let's test that claim. If that's not the case, just copy code from // them and let's test that claim. If that's not the case, just copy code from
@@ -363,6 +369,7 @@ where
/// Simultaneously listens for incoming client requests, which realistically should only be /// Simultaneously listens for incoming client requests, which realistically should only be
/// binary requests to forward sphinx packets or increase bandwidth /// binary requests to forward sphinx packets or increase bandwidth
/// and for sphinx packets received from the mix network that should be sent back to the client. /// and for sphinx packets received from the mix network that should be sent back to the client.
#[instrument(level="info", skip_all, name="Serving requests")]
pub(crate) async fn listen_for_requests(mut self, mut shutdown: TaskClient) pub(crate) async fn listen_for_requests(mut self, mut shutdown: TaskClient)
where where
S: AsyncRead + AsyncWrite + Unpin, S: AsyncRead + AsyncWrite + Unpin,
@@ -373,14 +380,16 @@ where
while !shutdown.is_shutdown() { while !shutdown.is_shutdown() {
tokio::select! { tokio::select! {
_ = shutdown.recv() => { _ = shutdown.recv() => {
log::trace!("client_handling::AuthenticatedHandler: received shutdown"); trace!("client_handling::AuthenticatedHandler: received shutdown");
} }
socket_msg = self.inner.read_websocket_message() => { socket_msg = self.inner.read_websocket_message() => {
debug!("Handling client request");
let socket_msg = match socket_msg { let socket_msg = match socket_msg {
None => break, None => break,
Some(Ok(socket_msg)) => socket_msg, Some(Ok(socket_msg)) => socket_msg,
Some(Err(err)) => { Some(Err(err)) => {
error!("failed to obtain message from websocket stream! stopping connection handler: {err}"); log::error!("failed to obtain message from websocket stream! stopping connection handler: {err}");
break; break;
} }
}; };
@@ -390,6 +399,7 @@ where
} }
if let Some(response) = self.handle_request(socket_msg).await { if let Some(response) = self.handle_request(socket_msg).await {
trace!("Sending response to client request");
if let Err(err) = self.inner.send_websocket_message(response).await { if let Err(err) = self.inner.send_websocket_message(response).await {
warn!( warn!(
"Failed to send message over websocket: {err}. Assuming the connection is dead.", "Failed to send message over websocket: {err}. Assuming the connection is dead.",
@@ -397,13 +407,17 @@ where
break; break;
} }
} }
}, },
mix_messages = self.mix_receiver.next() => { mix_messages = self.mix_receiver.next() => {
//let span = info_span!("Processing mixnet message");
//let guard = span.enter();
let mix_messages = mix_messages.expect("sender was unexpectedly closed! this shouldn't have ever happened!"); let mix_messages = mix_messages.expect("sender was unexpectedly closed! this shouldn't have ever happened!");
if let Err(err) = self.inner.push_packets_to_client(self.client.shared_keys, mix_messages).await { if let Err(err) = self.inner.push_packets_to_client(self.client.shared_keys, mix_messages).await {
warn!("failed to send the unwrapped sphinx packets back to the client - {err}, assuming the connection is dead"); warn!("failed to send the unwrapped sphinx packets back to the client - {err}, assuming the connection is dead");
break; break;
} }
//drop(guard);
} }
} }
} }
@@ -1,8 +1,9 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net> // Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use log::*;
use nym_coconut_interface::Credential; use nym_coconut_interface::Credential;
use coconut_interface::Credential;
use tracing::*;
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
use validator_client::nyxd::traits::DkgQueryClient; use validator_client::nyxd::traits::DkgQueryClient;
use validator_client::{ use validator_client::{
@@ -17,7 +17,7 @@ use gateway_requests::registration::handshake::error::HandshakeError;
use gateway_requests::registration::handshake::{gateway_handshake, SharedKeys}; use gateway_requests::registration::handshake::{gateway_handshake, SharedKeys};
use gateway_requests::types::{ClientControlRequest, ServerResponse}; use gateway_requests::types::{ClientControlRequest, ServerResponse};
use gateway_requests::{BinaryResponse, PROTOCOL_VERSION}; use gateway_requests::{BinaryResponse, PROTOCOL_VERSION};
use log::*; use tracing::*;
use mixnet_client::forwarder::MixForwardingSender; use mixnet_client::forwarder::MixForwardingSender;
use nym_crypto::asymmetric::identity; use nym_crypto::asymmetric::identity;
use nym_sphinx::DestinationAddressBytes; use nym_sphinx::DestinationAddressBytes;
@@ -111,6 +111,7 @@ where
/// Attempts to perform websocket handshake with the remote and upgrades the raw TCP socket /// Attempts to perform websocket handshake with the remote and upgrades the raw TCP socket
/// to the framed WebSocket. /// to the framed WebSocket.
#[instrument(level="debug", skip_all)]
pub(crate) async fn perform_websocket_handshake(&mut self) -> Result<(), WsError> pub(crate) async fn perform_websocket_handshake(&mut self) -> Result<(), WsError>
where where
S: AsyncRead + AsyncWrite + Unpin, S: AsyncRead + AsyncWrite + Unpin,
@@ -134,6 +135,7 @@ where
/// # Arguments /// # Arguments
/// ///
/// * `init_msg`: a client handshake init message which should contain its identity public key as well as an ephemeral key. /// * `init_msg`: a client handshake init message which should contain its identity public key as well as an ephemeral key.
#[instrument(level="debug", skip_all)]
async fn perform_registration_handshake( async fn perform_registration_handshake(
&mut self, &mut self,
init_msg: Vec<u8>, init_msg: Vec<u8>,
@@ -157,6 +159,7 @@ where
} }
/// Attempts to read websocket message from the associated socket. /// Attempts to read websocket message from the associated socket.
#[instrument(level="debug", skip_all)]
pub(crate) async fn read_websocket_message(&mut self) -> Option<Result<Message, WsError>> pub(crate) async fn read_websocket_message(&mut self) -> Option<Result<Message, WsError>>
where where
S: AsyncRead + AsyncWrite + Unpin, S: AsyncRead + AsyncWrite + Unpin,
@@ -172,6 +175,7 @@ where
/// # Arguments /// # Arguments
/// ///
/// * `msg`: WebSocket message to write back to the client. /// * `msg`: WebSocket message to write back to the client.
#[instrument(level="debug", skip_all)]
pub(crate) async fn send_websocket_message(&mut self, msg: Message) -> Result<(), WsError> pub(crate) async fn send_websocket_message(&mut self, msg: Message) -> Result<(), WsError>
where where
S: AsyncRead + AsyncWrite + Unpin, S: AsyncRead + AsyncWrite + Unpin,
@@ -192,6 +196,7 @@ where
/// ///
/// * `shared_keys`: keys derived between the client and gateway. /// * `shared_keys`: keys derived between the client and gateway.
/// * `packets`: unwrapped packets that are to be pushed back to the client. /// * `packets`: unwrapped packets that are to be pushed back to the client.
#[instrument(level="debug", skip_all, fields(packets = packets.len()))]
pub(crate) async fn push_packets_to_client( pub(crate) async fn push_packets_to_client(
&mut self, &mut self,
shared_keys: SharedKeys, shared_keys: SharedKeys,
@@ -202,6 +207,7 @@ where
{ {
// note: into_ws_message encrypts the requests and adds a MAC on it. Perhaps it should // note: into_ws_message encrypts the requests and adds a MAC on it. Perhaps it should
// be more explicit in the naming? // be more explicit in the naming?
debug!("Pushing {} packets", packets.len());
let messages: Vec<Result<Message, WsError>> = packets let messages: Vec<Result<Message, WsError>> = packets
.into_iter() .into_iter()
.map(|received_message| { .map(|received_message| {
@@ -561,6 +567,7 @@ where
/// result in client getting registered or authenticated. All other requests, such as forwarding /// result in client getting registered or authenticated. All other requests, such as forwarding
/// sphinx packets considered an error and terminate the connection. /// sphinx packets considered an error and terminate the connection.
// TODO: somehow cleanup this method // TODO: somehow cleanup this method
#[instrument(level="debug", skip_all)]
pub(crate) async fn perform_initial_authentication( pub(crate) async fn perform_initial_authentication(
mut self, mut self,
) -> Option<AuthenticatedHandler<R, S, St>> ) -> Option<AuthenticatedHandler<R, S, St>>
@@ -597,6 +604,7 @@ where
} }
} }
Ok(auth_result) => { Ok(auth_result) => {
debug!("Confirming auth");
if let Err(err) = self if let Err(err) = self
.send_websocket_message(auth_result.server_response.into()) .send_websocket_message(auth_result.server_response.into())
.await .await
@@ -4,9 +4,9 @@
use crate::node::storage::Storage; use crate::node::storage::Storage;
use gateway_requests::registration::handshake::SharedKeys; use gateway_requests::registration::handshake::SharedKeys;
use gateway_requests::ServerResponse; use gateway_requests::ServerResponse;
use log::{trace, warn};
use nym_sphinx::DestinationAddressBytes; use nym_sphinx::DestinationAddressBytes;
use nym_task::TaskClient; use nym_task::TaskClient;
use tracing::*;
use rand::{CryptoRng, Rng}; use rand::{CryptoRng, Rng};
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::WebSocketStream;
@@ -5,7 +5,7 @@ use crate::node::client_handling::active_clients::ActiveClientsStore;
use crate::node::client_handling::websocket::connection_handler::coconut::CoconutVerifier; use crate::node::client_handling::websocket::connection_handler::coconut::CoconutVerifier;
use crate::node::client_handling::websocket::connection_handler::FreshHandler; use crate::node::client_handling::websocket::connection_handler::FreshHandler;
use crate::node::storage::Storage; use crate::node::storage::Storage;
use log::*; use tracing::*;
use mixnet_client::forwarder::MixForwardingSender; use mixnet_client::forwarder::MixForwardingSender;
use nym_crypto::asymmetric::identity; use nym_crypto::asymmetric::identity;
use rand::rngs::OsRng; use rand::rngs::OsRng;
@@ -60,7 +60,7 @@ impl Listener {
tokio::select! { tokio::select! {
biased; biased;
_ = shutdown.recv() => { _ = shutdown.recv() => {
log::trace!("client_handling::Listener: received shutdown"); trace!("client_handling::Listener: received shutdown");
} }
connection = tcp_listener.accept() => { connection = tcp_listener.accept() => {
match connection { match connection {
@@ -79,7 +79,7 @@ impl Listener {
Arc::clone(&self.coconut_verifier), Arc::clone(&self.coconut_verifier),
); );
let shutdown = shutdown.clone(); let shutdown = shutdown.clone();
tokio::spawn(async move { handle.start_handling(shutdown).await }); tokio::spawn(async move { handle.start_handling(shutdown).await }.instrument(info_span!("Connection handling", address = %remote_addr)));
} }
Err(err) => warn!("failed to get client: {err}"), Err(err) => warn!("failed to get client: {err}"),
} }
@@ -102,6 +102,6 @@ impl Listener {
tokio::spawn(async move { tokio::spawn(async move {
self.run(outbound_mix_sender, storage, active_clients_store, shutdown) self.run(outbound_mix_sender, storage, active_clients_store, shutdown)
.await .await
}) }.instrument(info_span!("Client Listener")))
} }
} }
@@ -7,7 +7,7 @@ use crate::node::mixnet_handling::receiver::packet_processing::PacketProcessor;
use crate::node::storage::error::StorageError; use crate::node::storage::error::StorageError;
use crate::node::storage::Storage; use crate::node::storage::Storage;
use futures::StreamExt; use futures::StreamExt;
use log::*; use tracing::*;
use mixnet_client::forwarder::MixForwardingSender; use mixnet_client::forwarder::MixForwardingSender;
use mixnode_common::packet_processor::processor::ProcessedFinalHop; use mixnode_common::packet_processor::processor::ProcessedFinalHop;
use nym_sphinx::forwarding::packet::MixPacket; use nym_sphinx::forwarding::packet::MixPacket;
@@ -86,7 +86,7 @@ impl<St: Storage> ConnectionHandler<St> {
} }
} }
} }
#[instrument(level="debug", skip_all)]
fn try_push_message_to_client( fn try_push_message_to_client(
&mut self, &mut self,
client_address: DestinationAddressBytes, client_address: DestinationAddressBytes,
@@ -105,7 +105,7 @@ impl<St: Storage> ConnectionHandler<St> {
} }
} }
} }
#[instrument(level="debug", skip_all)]
pub(crate) async fn store_processed_packet_payload( pub(crate) async fn store_processed_packet_payload(
&self, &self,
client_address: DestinationAddressBytes, client_address: DestinationAddressBytes,
@@ -118,7 +118,7 @@ impl<St: Storage> ConnectionHandler<St> {
self.storage.store_message(client_address, message).await self.storage.store_message(client_address, message).await
} }
#[instrument(level="debug", skip_all)]
fn forward_ack(&self, forward_ack: Option<MixPacket>, client_address: DestinationAddressBytes) { fn forward_ack(&self, forward_ack: Option<MixPacket>, client_address: DestinationAddressBytes) {
if let Some(forward_ack) = forward_ack { if let Some(forward_ack) = forward_ack {
trace!( trace!(
@@ -130,7 +130,7 @@ impl<St: Storage> ConnectionHandler<St> {
self.ack_sender.unbounded_send(forward_ack).unwrap(); self.ack_sender.unbounded_send(forward_ack).unwrap();
} }
} }
#[instrument(level="debug", skip_all)]
async fn handle_processed_packet(&mut self, processed_final_hop: ProcessedFinalHop) { async fn handle_processed_packet(&mut self, processed_final_hop: ProcessedFinalHop) {
let client_address = processed_final_hop.destination; let client_address = processed_final_hop.destination;
let message = processed_final_hop.message; let message = processed_final_hop.message;
@@ -154,7 +154,7 @@ impl<St: Storage> ConnectionHandler<St> {
// received ack back into the network // received ack back into the network
self.forward_ack(forward_ack, client_address); self.forward_ack(forward_ack, client_address);
} }
//#[instrument(level="info", skip_all)]
async fn handle_received_packet(&mut self, framed_sphinx_packet: FramedSphinxPacket) { async fn handle_received_packet(&mut self, framed_sphinx_packet: FramedSphinxPacket) {
// //
// TODO: here be replay attack detection - it will require similar key cache to the one in // TODO: here be replay attack detection - it will require similar key cache to the one in
@@ -187,7 +187,7 @@ impl<St: Storage> ConnectionHandler<St> {
tokio::select! { tokio::select! {
biased; biased;
_ = shutdown.recv() => { _ = shutdown.recv() => {
log::trace!("ConnectionHandler: received shutdown"); trace!("ConnectionHandler: received shutdown");
} }
Some(framed_sphinx_packet) = framed_conn.next() => { Some(framed_sphinx_packet) = framed_conn.next() => {
match framed_sphinx_packet { match framed_sphinx_packet {
@@ -3,8 +3,8 @@
use crate::node::mixnet_handling::receiver::connection_handler::ConnectionHandler; use crate::node::mixnet_handling::receiver::connection_handler::ConnectionHandler;
use crate::node::storage::Storage; use crate::node::storage::Storage;
use log::*;
use nym_task::TaskClient; use nym_task::TaskClient;
use tracing::*;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::process; use std::process;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
@@ -37,13 +37,14 @@ impl Listener {
tokio::select! { tokio::select! {
biased; biased;
_ = self.shutdown.recv() => { _ = self.shutdown.recv() => {
log::trace!("mixnet_handling::Listener: Received shutdown"); trace!("mixnet_handling::Listener: Received shutdown");
} }
connection = tcp_listener.accept() => { connection = tcp_listener.accept() => {
match connection { match connection {
Ok((socket, remote_addr)) => { Ok((socket, remote_addr)) => {
let handler = connection_handler.clone(); let handler = connection_handler.clone();
tokio::spawn(handler.handle_connection(socket, remote_addr, self.shutdown.clone())); tokio::spawn(handler.handle_connection(socket, remote_addr, self.shutdown.clone())
.instrument(info_span!("Mixnet connection handling", address = %remote_addr)));
} }
Err(err) => warn!("failed to get client: {err}"), Err(err) => warn!("failed to get client: {err}"),
} }
@@ -58,6 +59,6 @@ impl Listener {
{ {
info!("Running mix listener on {:?}", self.address.to_string()); info!("Running mix listener on {:?}", self.address.to_string());
tokio::spawn(async move { self.run(connection_handler).await }) tokio::spawn(async move { self.run(connection_handler).await }.instrument(info_span!("Mixnet Listener")))
} }
} }
+3 -4
View File
@@ -14,7 +14,7 @@ use crate::node::statistics::collector::GatewayStatisticsCollector;
use crate::node::storage::Storage; use crate::node::storage::Storage;
use crate::{commands::sign::load_identity_keys, OutputFormat}; use crate::{commands::sign::load_identity_keys, OutputFormat};
use colored::Colorize; use colored::Colorize;
use log::*; use tracing::*;
use mixnet_client::forwarder::{MixForwardingSender, PacketForwarder}; use mixnet_client::forwarder::{MixForwardingSender, PacketForwarder};
use nym_crypto::asymmetric::{encryption, identity}; use nym_crypto::asymmetric::{encryption, identity};
use nym_network_defaults::NymNetworkDetails; use nym_network_defaults::NymNetworkDetails;
@@ -220,8 +220,7 @@ where
self.config.get_use_legacy_sphinx_framing(), self.config.get_use_legacy_sphinx_framing(),
shutdown, shutdown,
); );
tokio::spawn(async move { packet_forwarder.run().await }.instrument(info_span!("Packet Forwarder")));
tokio::spawn(async move { packet_forwarder.run().await });
packet_sender packet_sender
} }
@@ -230,7 +229,7 @@ where
shutdown: TaskManager, shutdown: TaskManager,
) -> Result<(), Box<dyn Error + Send + Sync>> { ) -> Result<(), Box<dyn Error + Send + Sync>> {
let res = shutdown.catch_interrupt().await; let res = shutdown.catch_interrupt().await;
log::info!("Stopping nym gateway"); info!("Stopping nym gateway");
res res
} }
+6
View File
@@ -36,6 +36,12 @@ tokio = { version="1.21.2", features = ["rt-multi-thread", "net", "signal"] }
tokio-util = { version="0.7.3", features = ["codec"] } tokio-util = { version="0.7.3", features = ["codec"] }
toml = "0.5.8" toml = "0.5.8"
url = { version = "2.2", features = ["serde"] } url = { version = "2.2", features = ["serde"] }
tracing = "0.1.37"
tracing-subscriber = {version = "0.3.16", features = ["registry", "std", "env-filter"]}
tracing-opentelemetry = "0.18.0"
opentelemetry-jaeger = {version = "0.17.0", features = ["collector_client","rt-tokio","isahc_collector_client"]}
opentelemetry = {version = "0.18.0", features = ["rt-tokio"]}
tracing-flame = "0.2.0"
atty = "0.2" atty = "0.2"
## internal ## internal
+30 -1
View File
@@ -7,6 +7,9 @@ extern crate rocket;
use ::nym_config::defaults::setup_env; use ::nym_config::defaults::setup_env;
use clap::{crate_name, crate_version, Parser, ValueEnum}; use clap::{crate_name, crate_version, Parser, ValueEnum};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::{EnvFilter, Registry, filter};
use tracing_flame::FlameLayer;
use nym_bin_common::logging::setup_logging; use nym_bin_common::logging::setup_logging;
use nym_bin_common::{build_information::BinaryBuildInformation, logging::banner}; use nym_bin_common::{build_information::BinaryBuildInformation, logging::banner};
@@ -62,7 +65,32 @@ impl Cli {
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
setup_logging(); //setup_logging();
//let tracer = opentelemetry_jaeger::new_agent_pipeline()
//.with_endpoint("143.42.21.138:6831")
//.with_service_name("nym_mixnode1")
//.with_auto_split_batch(true)
//.install_batch(opentelemetry::runtime::Tokio)
//.expect("Failed to initialize tracer");
//let jaeger_layer = tracing_opentelemetry::layer().with_tracer(tracer);
let hyper_filter = filter::filter_fn(|metadata| {!metadata.target().starts_with("hyper")});
let tokio_filter = filter::filter_fn(|metadata| {!metadata.target().starts_with("tokio")});
//let (flame_layer, _guard) = FlameLayer::with_file("./tracing.folded").unwrap();
let subscriber = Registry::default()
.with(EnvFilter::from_default_env())
.with(hyper_filter)
.with(tokio_filter)
.with(tracing_subscriber::fmt::layer().pretty());
//.with(flame_layer);
//.with(jaeger_layer);
tracing::subscriber::set_global_default(subscriber)
.expect("Failed to set global subscriber");
if atty::is(atty::Stream::Stdout) { if atty::is(atty::Stream::Stdout) {
println!("{}", banner(crate_name!(), crate_version!())); println!("{}", banner(crate_name!(), crate_version!()));
} }
@@ -70,6 +98,7 @@ async fn main() {
let args = Cli::parse(); let args = Cli::parse();
setup_env(args.config_env_file.as_ref()); setup_env(args.config_env_file.as_ref());
commands::execute(args).await; commands::execute(args).await;
opentelemetry::global::shutdown_tracer_provider();
} }
#[cfg(test)] #[cfg(test)]
@@ -7,8 +7,10 @@ use crate::node::listener::connection_handler::packet_processing::{
use crate::node::packet_delayforwarder::PacketDelayForwardSender; use crate::node::packet_delayforwarder::PacketDelayForwardSender;
use crate::node::TaskClient; use crate::node::TaskClient;
use futures::StreamExt; use futures::StreamExt;
use log::{error, info}; use tracing::{error, info, trace, debug, warn};
use tracing::*;
use nym_sphinx::forwarding::packet::MixPacket; use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::params::PacketSize;
use nym_sphinx::framing::codec::SphinxCodec; use nym_sphinx::framing::codec::SphinxCodec;
use nym_sphinx::framing::packet::FramedSphinxPacket; use nym_sphinx::framing::packet::FramedSphinxPacket;
use nym_sphinx::Delay as SphinxDelay; use nym_sphinx::Delay as SphinxDelay;
@@ -16,6 +18,7 @@ use std::net::SocketAddr;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::time::Instant; use tokio::time::Instant;
use tokio_util::codec::Framed; use tokio_util::codec::Framed;
use std::time::{SystemTime,UNIX_EPOCH};
pub(crate) mod packet_processing; pub(crate) mod packet_processing;
@@ -35,10 +38,12 @@ impl ConnectionHandler {
delay_forwarding_channel, delay_forwarding_channel,
} }
} }
#[instrument(level="debug", skip_all, "Sending to packet forwarder", fields(packet_size))]
fn delay_and_forward_packet(&self, mix_packet: MixPacket, delay: Option<SphinxDelay>) { fn delay_and_forward_packet(&self, mix_packet: MixPacket, delay: Option<SphinxDelay>) {
// determine instant at which packet should get forwarded. this way we minimise effect of // determine instant at which packet should get forwarded. this way we minimise effect of
// being stuck in the queue [of the channel] to get inserted into the delay queue // being stuck in the queue [of the channel] to get inserted into the delay queue
let packet_size = PacketSize::get_type(mix_packet.sphinx_packet().len()).unwrap();
Span::current().record("packet_size", field::debug(packet_size));
let forward_instant = delay.map(|delay| Instant::now() + delay.to_duration()); let forward_instant = delay.map(|delay| Instant::now() + delay.to_duration());
// if unbounded_send() failed it means that the receiver channel was disconnected // if unbounded_send() failed it means that the receiver channel was disconnected
@@ -47,7 +52,7 @@ impl ConnectionHandler {
.unbounded_send((mix_packet, forward_instant)) .unbounded_send((mix_packet, forward_instant))
.expect("the delay-forwarder has died!"); .expect("the delay-forwarder has died!");
} }
#[instrument(level="info", skip_all, "Handling packet",fields(packet_size=?framed_sphinx_packet.packet_size()))]
fn handle_received_packet(&self, framed_sphinx_packet: FramedSphinxPacket) { fn handle_received_packet(&self, framed_sphinx_packet: FramedSphinxPacket) {
// //
// TODO: here be replay attack detection - it will require similar key cache to the one in // TODO: here be replay attack detection - it will require similar key cache to the one in
@@ -69,7 +74,7 @@ impl ConnectionHandler {
}, },
} }
} }
#[instrument(level="info", skip_all, "Connection handling", fields(address=%remote))]
pub(crate) async fn handle_connection( pub(crate) async fn handle_connection(
self, self,
conn: TcpStream, conn: TcpStream,
@@ -83,7 +88,7 @@ impl ConnectionHandler {
tokio::select! { tokio::select! {
biased; biased;
_ = shutdown.recv() => { _ = shutdown.recv() => {
log::trace!("ConnectionHandler: received shutdown"); trace!("ConnectionHandler: received shutdown");
} }
Some(framed_sphinx_packet) = framed_conn.next() => { Some(framed_sphinx_packet) = framed_conn.next() => {
match framed_sphinx_packet { match framed_sphinx_packet {
@@ -96,7 +101,9 @@ impl ConnectionHandler {
// in theory we could process multiple sphinx packet from the same connection in parallel, // in theory we could process multiple sphinx packet from the same connection in parallel,
// but we already handle multiple concurrent connections so if anything, making // but we already handle multiple concurrent connections so if anything, making
// that change would only slow things down // that change would only slow things down
//println!("{:?}_In_{:?}", SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_micros(), framed_sphinx_packet.packet_size().size());
self.handle_received_packet(framed_sphinx_packet); self.handle_received_packet(framed_sphinx_packet);
//println!("{:?}_Processed_{:?}", SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_micros(), mix_packet.sphinx_packet().len());
} }
Err(err) => { Err(err) => {
error!( error!(
@@ -113,6 +120,6 @@ impl ConnectionHandler {
"Closing connection from {:?}", "Closing connection from {:?}",
framed_conn.into_inner().peer_addr() framed_conn.into_inner().peer_addr()
); );
log::trace!("ConnectionHandler: Exiting"); trace!("ConnectionHandler: Exiting");
} }
} }
+7 -6
View File
@@ -2,7 +2,8 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use crate::node::listener::connection_handler::ConnectionHandler; use crate::node::listener::connection_handler::ConnectionHandler;
use log::error; use tracing::*;
use tracing::{error, info, trace, warn};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::process; use std::process;
use tokio::net::TcpListener; use tokio::net::TcpListener;
@@ -21,9 +22,9 @@ impl Listener {
pub(crate) fn new(address: SocketAddr, shutdown: TaskClient) -> Self { pub(crate) fn new(address: SocketAddr, shutdown: TaskClient) -> Self {
Listener { address, shutdown } Listener { address, shutdown }
} }
#[instrument(level="info", skip_all, "Mixnet Listener")]
async fn run(&mut self, connection_handler: ConnectionHandler) { async fn run(&mut self, connection_handler: ConnectionHandler) {
log::trace!("Starting Listener"); trace!("Starting Listener");
let listener = match TcpListener::bind(self.address).await { let listener = match TcpListener::bind(self.address).await {
Ok(listener) => listener, Ok(listener) => listener,
Err(err) => { Err(err) => {
@@ -36,20 +37,20 @@ impl Listener {
tokio::select! { tokio::select! {
biased; biased;
_ = self.shutdown.recv() => { _ = self.shutdown.recv() => {
log::trace!("Listener: Received shutdown"); trace!("Listener: Received shutdown");
} }
connection = listener.accept() => { connection = listener.accept() => {
match connection { match connection {
Ok((socket, remote_addr)) => { Ok((socket, remote_addr)) => {
let handler = connection_handler.clone(); let handler = connection_handler.clone();
tokio::spawn(handler.handle_connection(socket, remote_addr, self.shutdown.clone())); tokio::spawn(handler.handle_connection(socket, remote_addr, self.shutdown.clone()).instrument(info_span!("Connection handling", address = %remote_addr)));
} }
Err(err) => warn!("Failed to accept incoming connection - {err}"), Err(err) => warn!("Failed to accept incoming connection - {err}"),
} }
}, },
}; };
} }
log::trace!("Listener: Exiting"); trace!("Listener: Exiting");
} }
pub(crate) fn start(mut self, connection_handler: ConnectionHandler) -> JoinHandle<()> { pub(crate) fn start(mut self, connection_handler: ConnectionHandler) -> JoinHandle<()> {
+5 -4
View File
@@ -17,9 +17,10 @@ use crate::node::listener::Listener;
use crate::node::node_description::NodeDescription; use crate::node::node_description::NodeDescription;
use crate::node::node_statistics::SharedNodeStats; use crate::node::node_statistics::SharedNodeStats;
use crate::node::packet_delayforwarder::{DelayForwarder, PacketDelayForwardSender}; use crate::node::packet_delayforwarder::{DelayForwarder, PacketDelayForwardSender};
use tracing::*;
use tracing::{info, error, warn};
use crate::OutputFormat; use crate::OutputFormat;
use colored::Colorize; use colored::Colorize;
use log::{error, info, warn};
use mixnode_common::verloc::{self, AtomicVerlocResult, VerlocMeasurer}; use mixnode_common::verloc::{self, AtomicVerlocResult, VerlocMeasurer};
use nym_bin_common::version_checker::parse_version; use nym_bin_common::version_checker::parse_version;
use nym_config::NymConfig; use nym_config::NymConfig;
@@ -215,7 +216,7 @@ impl MixNode {
let packet_sender = packet_forwarder.sender(); let packet_sender = packet_forwarder.sender();
tokio::spawn(async move { packet_forwarder.run().await }); tokio::spawn(async move { packet_forwarder.run().await }.instrument(info_span!("Packet delay forwarder")));
packet_sender packet_sender
} }
@@ -294,7 +295,7 @@ impl MixNode {
async fn wait_for_interrupt(&self, shutdown: TaskManager) { async fn wait_for_interrupt(&self, shutdown: TaskManager) {
let _res = shutdown.catch_interrupt().await; let _res = shutdown.catch_interrupt().await;
log::info!("Stopping nym mixnode"); info!("Stopping nym mixnode");
} }
pub async fn run(&mut self) { pub async fn run(&mut self) {
@@ -304,7 +305,7 @@ impl MixNode {
if duplicate_node_key == self.identity_keypair.public_key().to_base58_string() { if duplicate_node_key == self.identity_keypair.public_key().to_base58_string() {
warn!("You seem to have bonded your mixnode before starting it - that's highly unrecommended as in the future it might result in slashing"); warn!("You seem to have bonded your mixnode before starting it - that's highly unrecommended as in the future it might result in slashing");
} else { } else {
log::error!( error!(
"Our announce-host is identical to an existing node's announce-host! (its key is {:?})", "Our announce-host is identical to an existing node's announce-host! (its key is {:?})",
duplicate_node_key duplicate_node_key
); );
+10 -5
View File
@@ -4,10 +4,13 @@
use crate::node::node_statistics::UpdateSender; use crate::node::node_statistics::UpdateSender;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::StreamExt; use futures::StreamExt;
use nym_sphinx::params::packet_sizes::PacketSize;
use nym_nonexhaustive_delayqueue::{Expired, NonExhaustiveDelayQueue}; use nym_nonexhaustive_delayqueue::{Expired, NonExhaustiveDelayQueue};
use nym_sphinx::forwarding::packet::MixPacket; use nym_sphinx::forwarding::packet::MixPacket;
use std::io; use std::io;
use tokio::time::Instant; use tokio::time::Instant;
use tracing::*;
use tracing::{trace};
use super::TaskClient; use super::TaskClient;
@@ -55,11 +58,13 @@ where
pub(crate) fn sender(&self) -> PacketDelayForwardSender { pub(crate) fn sender(&self) -> PacketDelayForwardSender {
self.packet_sender.clone() self.packet_sender.clone()
} }
#[instrument(level="debug", skip_all, "Forwarding packet", fields(packet_size))]
fn forward_packet(&mut self, packet: MixPacket) { fn forward_packet(&mut self, packet: MixPacket) {
let next_hop = packet.next_hop(); let next_hop = packet.next_hop();
let packet_mode = packet.packet_mode(); let packet_mode = packet.packet_mode();
let sphinx_packet = packet.into_sphinx_packet(); let sphinx_packet = packet.into_sphinx_packet();
let packet_size = PacketSize::get_type(sphinx_packet.len()).unwrap();
Span::current().record("packet_size", field::debug(packet_size));
if let Err(err) = if let Err(err) =
self.mixnet_client self.mixnet_client
@@ -87,7 +92,7 @@ where
let delayed_packet = packet.into_inner(); let delayed_packet = packet.into_inner();
self.forward_packet(delayed_packet) self.forward_packet(delayed_packet)
} }
#[instrument(level="debug", skip_all, "Handling packet", fields(packet_size))]
fn handle_new_packet(&mut self, new_packet: (MixPacket, Option<Instant>)) { fn handle_new_packet(&mut self, new_packet: (MixPacket, Option<Instant>)) {
// in case of a zero delay packet, don't bother putting it in the delay queue, // in case of a zero delay packet, don't bother putting it in the delay queue,
// just forward it immediately // just forward it immediately
@@ -105,7 +110,7 @@ where
} }
pub(crate) async fn run(&mut self) { pub(crate) async fn run(&mut self) {
log::trace!("Starting DelayForwarder"); trace!("Starting DelayForwarder");
loop { loop {
tokio::select! { tokio::select! {
delayed = self.delay_queue.next() => { delayed = self.delay_queue.next() => {
@@ -117,12 +122,12 @@ where
self.handle_new_packet(new_packet.unwrap()) self.handle_new_packet(new_packet.unwrap())
} }
_ = self.shutdown.recv() => { _ = self.shutdown.recv() => {
log::trace!("DelayForwarder: Received shutdown"); trace!("DelayForwarder: Received shutdown");
break; break;
} }
} }
} }
log::trace!("DelayForwarder: Exiting"); trace!("DelayForwarder: Exiting");
} }
} }
+2 -2
View File
@@ -127,8 +127,8 @@ impl RewardedSetUpdater {
} }
// Reward all the nodes in the still current, soon to be previous rewarded set // Reward all the nodes in the still current, soon to be previous rewarded set
log::info!("Rewarding the current rewarded set..."); //log::info!("Rewarding the current rewarded set...");
self.reward_current_rewarded_set(interval).await?; //self.reward_current_rewarded_set(interval).await?;
// note: those operations don't really have to be atomic, so it's fine to send them // note: those operations don't really have to be atomic, so it's fine to send them
// as separate transactions // as separate transactions
+1 -1
View File
@@ -68,7 +68,7 @@ pub async fn get_rewarded_set_detailed(
#[openapi(tag = "contract-cache")] #[openapi(tag = "contract-cache")]
#[get("/mixnodes/active")] #[get("/mixnodes/active")]
pub async fn get_active_set(cache: &State<NymContractCache>) -> Json<Vec<MixNodeDetails>> { pub async fn get_active_set(cache: &State<NymContractCache>) -> Json<Vec<MixNodeDetails>> {
Json(cache.active_set().await.value) Json(cache.mixnodes_filtered().await)
} }
// DEPRECATED: this endpoint now lives in `node_status_api`. Once all consumers are updated, // DEPRECATED: this endpoint now lives in `node_status_api`. Once all consumers are updated,