Compare commits

...

3 Commits

Author SHA1 Message Date
Bogdan-Ștefan Neacşu 4cc63bac1c Fix Box ws_fd 2024-02-13 12:06:10 +02:00
Bogdan-Ștefan Neacşu 6519bfa533 Add log 2024-02-13 11:45:25 +02:00
Bogdan-Ștefan Neacşu dc9823334a Testing 2024-02-12 20:10:09 +02:00
7 changed files with 72 additions and 0 deletions
@@ -52,6 +52,7 @@ use nym_topology::provider_trait::TopologyProvider;
use nym_topology::HardcodedTopologyProvider;
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
use std::fmt::Debug;
use std::os::fd::RawFd;
use std::path::Path;
use std::sync::Arc;
use url::Url;
@@ -683,6 +684,7 @@ where
packet_stats_reporter.clone(),
);
let gateway_fd = gateway_transceiver.ws_fd();
// The message_sender is the transmitter for any component generating sphinx packets
// that are to be sent to the mixnet. They are used by cover traffic stream and real
// traffic stream.
@@ -755,6 +757,7 @@ where
received_buffer_request_sender,
},
},
gateway_fd,
client_state: ClientState {
shared_lane_queue_lengths,
reply_controller_sender,
@@ -770,6 +773,7 @@ pub struct BaseClient {
pub client_input: ClientInputStatus,
pub client_output: ClientOutputStatus,
pub client_state: ClientState,
pub gateway_fd: Option<RawFd>,
pub task_handle: TaskHandle,
}
@@ -8,6 +8,7 @@ use nym_gateway_client::GatewayClient;
pub use nym_gateway_client::{GatewayPacketRouter, PacketRouter};
use nym_sphinx::forwarding::packet::MixPacket;
use std::fmt::Debug;
use std::os::fd::RawFd;
use thiserror::Error;
#[cfg(not(target_arch = "wasm32"))]
@@ -25,6 +26,7 @@ fn erase_err<E: std::error::Error + Send + Sync + 'static>(err: E) -> ErasedGate
/// This combines combines the functionalities of being able to send and receive mix packets.
pub trait GatewayTransceiver: GatewaySender + GatewayReceiver {
fn gateway_identity(&self) -> identity::PublicKey;
fn ws_fd(&self) -> Option<RawFd>;
}
/// This trait defines the functionality of sending `MixPacket` into the mixnet,
@@ -66,6 +68,9 @@ impl<G: GatewayTransceiver + ?Sized + Send> GatewayTransceiver for Box<G> {
fn gateway_identity(&self) -> identity::PublicKey {
(**self).gateway_identity()
}
fn ws_fd(&self) -> Option<RawFd> {
(**self).ws_fd()
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -112,6 +117,9 @@ where
fn gateway_identity(&self) -> identity::PublicKey {
self.gateway_client.gateway_identity()
}
fn ws_fd(&self) -> Option<RawFd> {
self.gateway_client.ws_fd()
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -187,6 +195,9 @@ mod nonwasm_sealed {
fn gateway_identity(&self) -> identity::PublicKey {
self.local_identity
}
fn ws_fd(&self) -> Option<RawFd> {
None
}
}
#[async_trait]
@@ -259,4 +270,7 @@ impl GatewayTransceiver for MockGateway {
fn gateway_identity(&self) -> identity::PublicKey {
self.dummy_identity
}
fn ws_fd(&self) -> Option<RawFd> {
None
}
}
@@ -26,6 +26,8 @@ use nym_task::TaskClient;
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
use rand::rngs::OsRng;
use std::convert::TryFrom;
use std::os::fd::AsRawFd;
use std::os::fd::RawFd;
use std::sync::Arc;
use std::time::Duration;
use tungstenite::protocol::Message;
@@ -34,6 +36,7 @@ use tungstenite::protocol::Message;
use tokio::time::sleep;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::connect_async;
use tokio_tungstenite::MaybeTlsStream;
#[cfg(target_arch = "wasm32")]
use wasm_utils::websocket::JSWebsocket;
@@ -146,6 +149,21 @@ impl<C, St> GatewayClient<C, St> {
self.gateway_identity
}
pub fn ws_fd(&self) -> Option<RawFd> {
match &self.connection {
SocketState::Available(conn) => match conn.get_ref() {
MaybeTlsStream::Plain(stream) => Some(stream.as_raw_fd()),
MaybeTlsStream::NativeTls(stream) => Some(stream.as_raw_fd()),
&_ => None,
},
SocketState::PartiallyDelegated(conn) => Some(conn.ws_fd()),
_ => {
log::warn!("No fd yet");
None
}
}
}
pub fn remaining_bandwidth(&self) -> i64 {
self.bandwidth_remaining
}
@@ -11,6 +11,7 @@ use futures::{SinkExt, StreamExt};
use log::*;
use nym_gateway_requests::registration::handshake::SharedKeys;
use nym_task::TaskClient;
use std::os::fd::{AsRawFd, RawFd};
use std::sync::Arc;
use tungstenite::Message;
@@ -38,11 +39,15 @@ type WsConn = JSWebsocket;
type SplitStreamReceiver = oneshot::Receiver<Result<SplitStream<WsConn>, GatewayClientError>>;
pub(crate) struct PartiallyDelegated {
ws_fd: RawFd,
sink_half: SplitSink<WsConn, Message>,
delegated_stream: (SplitStreamReceiver, oneshot::Sender<()>),
}
impl PartiallyDelegated {
pub fn ws_fd(&self) -> RawFd {
self.ws_fd
}
fn recover_received_plaintexts(ws_msgs: Vec<Message>, shared_key: &SharedKeys) -> Vec<Vec<u8>> {
let mut plaintexts = Vec::with_capacity(ws_msgs.len());
for ws_msg in ws_msgs {
@@ -92,6 +97,11 @@ impl PartiallyDelegated {
let (notify_sender, notify_receiver) = oneshot::channel();
let (stream_sender, stream_receiver) = oneshot::channel();
let ws_fd = match conn.get_ref() {
MaybeTlsStream::Plain(stream) => stream.as_raw_fd(),
MaybeTlsStream::NativeTls(stream) => stream.as_raw_fd(),
_ => 0.into(),
};
let (sink, mut stream) = conn.split();
let mixnet_receiver_future = async move {
@@ -141,6 +151,7 @@ impl PartiallyDelegated {
tokio::spawn(mixnet_receiver_future);
PartiallyDelegated {
ws_fd,
sink_half: sink,
delegated_stream: (stream_receiver, notify_sender),
}
+15
View File
@@ -958,6 +958,12 @@ version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f"
[[package]]
name = "const-str"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aca749d3d3f5b87a0d6100509879f9cf486ab510803a4a4e1001da1ff61c2bd6"
[[package]]
name = "constant_time_eq"
version = "0.3.0"
@@ -3722,6 +3728,7 @@ dependencies = [
"clap",
"clap_complete",
"clap_complete_fig",
"const-str",
"log",
"pretty_env_logger",
"schemars",
@@ -3762,6 +3769,7 @@ dependencies = [
"serde",
"serde_json",
"sha2 0.10.8",
"si-scale",
"sqlx",
"tap",
"thiserror",
@@ -4138,6 +4146,7 @@ dependencies = [
"cfg-if",
"dotenvy",
"hex-literal",
"log",
"once_cell",
"schemars",
"serde",
@@ -6221,6 +6230,12 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "si-scale"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44beb68bf488343b13ddbd74d1d5d5e6559a58b6dfaee74eb8d5ed4f7ed7666f"
[[package]]
name = "signal-hook"
version = "0.3.17"
+1
View File
@@ -714,6 +714,7 @@ where
client_state,
reconstructed_receiver,
started_client.task_handle,
started_client.gateway_fd,
None,
))
}
@@ -16,6 +16,7 @@ use nym_task::{
TaskHandle,
};
use nym_topology::NymTopology;
use std::os::fd::RawFd;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -44,6 +45,8 @@ pub struct MixnetClient {
pub(crate) task_handle: TaskHandle,
pub(crate) packet_type: Option<PacketType>,
pub(crate) gateway_fd: Option<RawFd>,
// internal state used for the `Stream` implementation
_buffered: Vec<ReconstructedMessage>,
}
@@ -56,6 +59,7 @@ impl MixnetClient {
client_state: ClientState,
reconstructed_receiver: ReconstructedMessagesReceiver,
task_handle: TaskHandle,
gateway_fd: Option<RawFd>,
packet_type: Option<PacketType>,
) -> Self {
Self {
@@ -65,6 +69,7 @@ impl MixnetClient {
client_state,
reconstructed_receiver,
task_handle,
gateway_fd,
packet_type,
_buffered: Vec::new(),
}
@@ -97,6 +102,10 @@ impl MixnetClient {
&self.nym_address
}
pub fn gateway_fd(&self) -> Option<RawFd> {
self.gateway_fd
}
/// Get a shallow clone of [`MixnetClientSender`]. Useful if you want split the send and
/// receive logic in different locations.
pub fn split_sender(&self) -> MixnetClientSender {