Compare commits

...

3 Commits

Author SHA1 Message Date
Bogdan-Ștefan Neacşu 4cc63bac1c Fix Box ws_fd 2024-02-13 12:06:10 +02:00
Bogdan-Ștefan Neacşu 6519bfa533 Add log 2024-02-13 11:45:25 +02:00
Bogdan-Ștefan Neacşu dc9823334a Testing 2024-02-12 20:10:09 +02:00
7 changed files with 72 additions and 0 deletions
@@ -52,6 +52,7 @@ use nym_topology::provider_trait::TopologyProvider;
use nym_topology::HardcodedTopologyProvider; use nym_topology::HardcodedTopologyProvider;
use nym_validator_client::nyxd::contract_traits::DkgQueryClient; use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
use std::fmt::Debug; use std::fmt::Debug;
use std::os::fd::RawFd;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use url::Url; use url::Url;
@@ -683,6 +684,7 @@ where
packet_stats_reporter.clone(), packet_stats_reporter.clone(),
); );
let gateway_fd = gateway_transceiver.ws_fd();
// The message_sender is the transmitter for any component generating sphinx packets // The message_sender is the transmitter for any component generating sphinx packets
// that are to be sent to the mixnet. They are used by cover traffic stream and real // that are to be sent to the mixnet. They are used by cover traffic stream and real
// traffic stream. // traffic stream.
@@ -755,6 +757,7 @@ where
received_buffer_request_sender, received_buffer_request_sender,
}, },
}, },
gateway_fd,
client_state: ClientState { client_state: ClientState {
shared_lane_queue_lengths, shared_lane_queue_lengths,
reply_controller_sender, reply_controller_sender,
@@ -770,6 +773,7 @@ pub struct BaseClient {
pub client_input: ClientInputStatus, pub client_input: ClientInputStatus,
pub client_output: ClientOutputStatus, pub client_output: ClientOutputStatus,
pub client_state: ClientState, pub client_state: ClientState,
pub gateway_fd: Option<RawFd>,
pub task_handle: TaskHandle, pub task_handle: TaskHandle,
} }
@@ -8,6 +8,7 @@ use nym_gateway_client::GatewayClient;
pub use nym_gateway_client::{GatewayPacketRouter, PacketRouter}; pub use nym_gateway_client::{GatewayPacketRouter, PacketRouter};
use nym_sphinx::forwarding::packet::MixPacket; use nym_sphinx::forwarding::packet::MixPacket;
use std::fmt::Debug; use std::fmt::Debug;
use std::os::fd::RawFd;
use thiserror::Error; use thiserror::Error;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
@@ -25,6 +26,7 @@ fn erase_err<E: std::error::Error + Send + Sync + 'static>(err: E) -> ErasedGate
/// This combines combines the functionalities of being able to send and receive mix packets. /// This combines combines the functionalities of being able to send and receive mix packets.
pub trait GatewayTransceiver: GatewaySender + GatewayReceiver { pub trait GatewayTransceiver: GatewaySender + GatewayReceiver {
fn gateway_identity(&self) -> identity::PublicKey; fn gateway_identity(&self) -> identity::PublicKey;
fn ws_fd(&self) -> Option<RawFd>;
} }
/// This trait defines the functionality of sending `MixPacket` into the mixnet, /// This trait defines the functionality of sending `MixPacket` into the mixnet,
@@ -66,6 +68,9 @@ impl<G: GatewayTransceiver + ?Sized + Send> GatewayTransceiver for Box<G> {
fn gateway_identity(&self) -> identity::PublicKey { fn gateway_identity(&self) -> identity::PublicKey {
(**self).gateway_identity() (**self).gateway_identity()
} }
fn ws_fd(&self) -> Option<RawFd> {
(**self).ws_fd()
}
} }
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -112,6 +117,9 @@ where
fn gateway_identity(&self) -> identity::PublicKey { fn gateway_identity(&self) -> identity::PublicKey {
self.gateway_client.gateway_identity() self.gateway_client.gateway_identity()
} }
fn ws_fd(&self) -> Option<RawFd> {
self.gateway_client.ws_fd()
}
} }
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -187,6 +195,9 @@ mod nonwasm_sealed {
fn gateway_identity(&self) -> identity::PublicKey { fn gateway_identity(&self) -> identity::PublicKey {
self.local_identity self.local_identity
} }
fn ws_fd(&self) -> Option<RawFd> {
None
}
} }
#[async_trait] #[async_trait]
@@ -259,4 +270,7 @@ impl GatewayTransceiver for MockGateway {
fn gateway_identity(&self) -> identity::PublicKey { fn gateway_identity(&self) -> identity::PublicKey {
self.dummy_identity self.dummy_identity
} }
fn ws_fd(&self) -> Option<RawFd> {
None
}
} }
@@ -26,6 +26,8 @@ use nym_task::TaskClient;
use nym_validator_client::nyxd::contract_traits::DkgQueryClient; use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
use rand::rngs::OsRng; use rand::rngs::OsRng;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::os::fd::AsRawFd;
use std::os::fd::RawFd;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tungstenite::protocol::Message; use tungstenite::protocol::Message;
@@ -34,6 +36,7 @@ use tungstenite::protocol::Message;
use tokio::time::sleep; use tokio::time::sleep;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::connect_async; use tokio_tungstenite::connect_async;
use tokio_tungstenite::MaybeTlsStream;
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
use wasm_utils::websocket::JSWebsocket; use wasm_utils::websocket::JSWebsocket;
@@ -146,6 +149,21 @@ impl<C, St> GatewayClient<C, St> {
self.gateway_identity self.gateway_identity
} }
pub fn ws_fd(&self) -> Option<RawFd> {
match &self.connection {
SocketState::Available(conn) => match conn.get_ref() {
MaybeTlsStream::Plain(stream) => Some(stream.as_raw_fd()),
MaybeTlsStream::NativeTls(stream) => Some(stream.as_raw_fd()),
&_ => None,
},
SocketState::PartiallyDelegated(conn) => Some(conn.ws_fd()),
_ => {
log::warn!("No fd yet");
None
}
}
}
pub fn remaining_bandwidth(&self) -> i64 { pub fn remaining_bandwidth(&self) -> i64 {
self.bandwidth_remaining self.bandwidth_remaining
} }
@@ -11,6 +11,7 @@ use futures::{SinkExt, StreamExt};
use log::*; use log::*;
use nym_gateway_requests::registration::handshake::SharedKeys; use nym_gateway_requests::registration::handshake::SharedKeys;
use nym_task::TaskClient; use nym_task::TaskClient;
use std::os::fd::{AsRawFd, RawFd};
use std::sync::Arc; use std::sync::Arc;
use tungstenite::Message; use tungstenite::Message;
@@ -38,11 +39,15 @@ type WsConn = JSWebsocket;
type SplitStreamReceiver = oneshot::Receiver<Result<SplitStream<WsConn>, GatewayClientError>>; type SplitStreamReceiver = oneshot::Receiver<Result<SplitStream<WsConn>, GatewayClientError>>;
pub(crate) struct PartiallyDelegated { pub(crate) struct PartiallyDelegated {
ws_fd: RawFd,
sink_half: SplitSink<WsConn, Message>, sink_half: SplitSink<WsConn, Message>,
delegated_stream: (SplitStreamReceiver, oneshot::Sender<()>), delegated_stream: (SplitStreamReceiver, oneshot::Sender<()>),
} }
impl PartiallyDelegated { impl PartiallyDelegated {
pub fn ws_fd(&self) -> RawFd {
self.ws_fd
}
fn recover_received_plaintexts(ws_msgs: Vec<Message>, shared_key: &SharedKeys) -> Vec<Vec<u8>> { fn recover_received_plaintexts(ws_msgs: Vec<Message>, shared_key: &SharedKeys) -> Vec<Vec<u8>> {
let mut plaintexts = Vec::with_capacity(ws_msgs.len()); let mut plaintexts = Vec::with_capacity(ws_msgs.len());
for ws_msg in ws_msgs { for ws_msg in ws_msgs {
@@ -92,6 +97,11 @@ impl PartiallyDelegated {
let (notify_sender, notify_receiver) = oneshot::channel(); let (notify_sender, notify_receiver) = oneshot::channel();
let (stream_sender, stream_receiver) = oneshot::channel(); let (stream_sender, stream_receiver) = oneshot::channel();
let ws_fd = match conn.get_ref() {
MaybeTlsStream::Plain(stream) => stream.as_raw_fd(),
MaybeTlsStream::NativeTls(stream) => stream.as_raw_fd(),
_ => 0.into(),
};
let (sink, mut stream) = conn.split(); let (sink, mut stream) = conn.split();
let mixnet_receiver_future = async move { let mixnet_receiver_future = async move {
@@ -141,6 +151,7 @@ impl PartiallyDelegated {
tokio::spawn(mixnet_receiver_future); tokio::spawn(mixnet_receiver_future);
PartiallyDelegated { PartiallyDelegated {
ws_fd,
sink_half: sink, sink_half: sink,
delegated_stream: (stream_receiver, notify_sender), delegated_stream: (stream_receiver, notify_sender),
} }
+15
View File
@@ -958,6 +958,12 @@ version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f" checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f"
[[package]]
name = "const-str"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aca749d3d3f5b87a0d6100509879f9cf486ab510803a4a4e1001da1ff61c2bd6"
[[package]] [[package]]
name = "constant_time_eq" name = "constant_time_eq"
version = "0.3.0" version = "0.3.0"
@@ -3722,6 +3728,7 @@ dependencies = [
"clap", "clap",
"clap_complete", "clap_complete",
"clap_complete_fig", "clap_complete_fig",
"const-str",
"log", "log",
"pretty_env_logger", "pretty_env_logger",
"schemars", "schemars",
@@ -3762,6 +3769,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"sha2 0.10.8", "sha2 0.10.8",
"si-scale",
"sqlx", "sqlx",
"tap", "tap",
"thiserror", "thiserror",
@@ -4138,6 +4146,7 @@ dependencies = [
"cfg-if", "cfg-if",
"dotenvy", "dotenvy",
"hex-literal", "hex-literal",
"log",
"once_cell", "once_cell",
"schemars", "schemars",
"serde", "serde",
@@ -6221,6 +6230,12 @@ dependencies = [
"lazy_static", "lazy_static",
] ]
[[package]]
name = "si-scale"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44beb68bf488343b13ddbd74d1d5d5e6559a58b6dfaee74eb8d5ed4f7ed7666f"
[[package]] [[package]]
name = "signal-hook" name = "signal-hook"
version = "0.3.17" version = "0.3.17"
+1
View File
@@ -714,6 +714,7 @@ where
client_state, client_state,
reconstructed_receiver, reconstructed_receiver,
started_client.task_handle, started_client.task_handle,
started_client.gateway_fd,
None, None,
)) ))
} }
@@ -16,6 +16,7 @@ use nym_task::{
TaskHandle, TaskHandle,
}; };
use nym_topology::NymTopology; use nym_topology::NymTopology;
use std::os::fd::RawFd;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
@@ -44,6 +45,8 @@ pub struct MixnetClient {
pub(crate) task_handle: TaskHandle, pub(crate) task_handle: TaskHandle,
pub(crate) packet_type: Option<PacketType>, pub(crate) packet_type: Option<PacketType>,
pub(crate) gateway_fd: Option<RawFd>,
// internal state used for the `Stream` implementation // internal state used for the `Stream` implementation
_buffered: Vec<ReconstructedMessage>, _buffered: Vec<ReconstructedMessage>,
} }
@@ -56,6 +59,7 @@ impl MixnetClient {
client_state: ClientState, client_state: ClientState,
reconstructed_receiver: ReconstructedMessagesReceiver, reconstructed_receiver: ReconstructedMessagesReceiver,
task_handle: TaskHandle, task_handle: TaskHandle,
gateway_fd: Option<RawFd>,
packet_type: Option<PacketType>, packet_type: Option<PacketType>,
) -> Self { ) -> Self {
Self { Self {
@@ -65,6 +69,7 @@ impl MixnetClient {
client_state, client_state,
reconstructed_receiver, reconstructed_receiver,
task_handle, task_handle,
gateway_fd,
packet_type, packet_type,
_buffered: Vec::new(), _buffered: Vec::new(),
} }
@@ -97,6 +102,10 @@ impl MixnetClient {
&self.nym_address &self.nym_address
} }
pub fn gateway_fd(&self) -> Option<RawFd> {
self.gateway_fd
}
/// Get a shallow clone of [`MixnetClientSender`]. Useful if you want split the send and /// Get a shallow clone of [`MixnetClientSender`]. Useful if you want split the send and
/// receive logic in different locations. /// receive logic in different locations.
pub fn split_sender(&self) -> MixnetClientSender { pub fn split_sender(&self) -> MixnetClientSender {