Compare commits
14 Commits
master
...
mix-tcp-stuff
| Author | SHA1 | Date | |
|---|---|---|---|
| 1b33d8e2c9 | |||
| 2fe282d774 | |||
| abfd121d4e | |||
| 46ebb7039a | |||
| bc28cc70b7 | |||
| 36e8468b73 | |||
| cefb217c25 | |||
| 4adcd32ebf | |||
| 7820b816b5 | |||
| f54cee786c | |||
| edd9fef468 | |||
| eed87ff4c9 | |||
| 575056ac67 | |||
| 34e9822f1d |
Generated
+10
@@ -4138,6 +4138,7 @@ version = "1.1.15"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"base64 0.21.7",
|
||||
"bincode",
|
||||
"bs58 0.5.1",
|
||||
"cfg-if",
|
||||
"clap 4.5.4",
|
||||
@@ -4180,6 +4181,7 @@ dependencies = [
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-tungstenite",
|
||||
"tokio-util",
|
||||
"tungstenite",
|
||||
"url",
|
||||
"wasm-bindgen",
|
||||
@@ -4253,6 +4255,7 @@ dependencies = [
|
||||
"serde-wasm-bindgen 0.6.5",
|
||||
"serde_json",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tsify",
|
||||
"wasm-bindgen",
|
||||
"wasm-bindgen-futures",
|
||||
@@ -4956,6 +4959,7 @@ dependencies = [
|
||||
"time",
|
||||
"tokio",
|
||||
"tokio-tungstenite",
|
||||
"tokio-util",
|
||||
"url",
|
||||
"zeroize",
|
||||
]
|
||||
@@ -5300,6 +5304,7 @@ dependencies = [
|
||||
"tap",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"url",
|
||||
]
|
||||
|
||||
@@ -5359,6 +5364,7 @@ dependencies = [
|
||||
name = "nym-sphinx"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"bincode",
|
||||
"log",
|
||||
"nym-crypto",
|
||||
"nym-mixnet-contract-common",
|
||||
@@ -5375,8 +5381,10 @@ dependencies = [
|
||||
"nym-topology",
|
||||
"rand 0.8.5",
|
||||
"rand_distr",
|
||||
"serde",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5463,6 +5471,7 @@ dependencies = [
|
||||
"nym-sphinx-addressing",
|
||||
"nym-sphinx-params",
|
||||
"nym-sphinx-types",
|
||||
"serde",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
@@ -5540,6 +5549,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"log",
|
||||
"serde",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"wasm-bindgen",
|
||||
|
||||
@@ -10,6 +10,7 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
async-trait = { workspace = true }
|
||||
bincode = { workspace = true }
|
||||
base64 = "0.21.2"
|
||||
bs58 = { workspace = true }
|
||||
cfg-if = { workspace = true }
|
||||
@@ -26,6 +27,7 @@ tap = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
url = { workspace = true, features = ["serde"] }
|
||||
tokio = { workspace = true, features = ["macros"] }
|
||||
tokio-util = { workspace = true, features = ["codec"] }
|
||||
time = { workspace = true }
|
||||
zeroize = { workspace = true }
|
||||
|
||||
@@ -47,7 +49,9 @@ nym-validator-client = { path = "../client-libs/validator-client", default-featu
|
||||
nym-task = { path = "../task" }
|
||||
nym-credential-storage = { path = "../credential-storage" }
|
||||
nym-network-defaults = { path = "../network-defaults" }
|
||||
nym-client-core-config-types = { path = "./config-types", features = ["disk-persistence"] }
|
||||
nym-client-core-config-types = { path = "./config-types", features = [
|
||||
"disk-persistence",
|
||||
] }
|
||||
nym-client-core-surb-storage = { path = "./surb-storage" }
|
||||
nym-client-core-gateways-storage = { path = "./gateways-storage" }
|
||||
|
||||
|
||||
@@ -35,6 +35,7 @@ use crate::init::{
|
||||
};
|
||||
use crate::{config, spawn_future};
|
||||
use futures::channel::mpsc;
|
||||
use futures::SinkExt;
|
||||
use log::{debug, error, info, warn};
|
||||
use nym_bandwidth_controller::BandwidthController;
|
||||
use nym_client_core_gateways_storage::{GatewayDetails, GatewaysDetailsStore};
|
||||
@@ -59,6 +60,7 @@ use std::fmt::Debug;
|
||||
use std::os::raw::c_int as RawFd;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use tokio_util::sync::{PollSendError, PollSender};
|
||||
use url::Url;
|
||||
|
||||
#[cfg(all(
|
||||
@@ -78,10 +80,7 @@ pub struct ClientInput {
|
||||
}
|
||||
|
||||
impl ClientInput {
|
||||
pub async fn send(
|
||||
&self,
|
||||
message: InputMessage,
|
||||
) -> Result<(), tokio::sync::mpsc::error::SendError<InputMessage>> {
|
||||
pub async fn send(&mut self, message: InputMessage) -> Result<(), PollSendError<InputMessage>> {
|
||||
self.input_sender.send(message).await
|
||||
}
|
||||
}
|
||||
@@ -804,7 +803,7 @@ where
|
||||
client_input: ClientInputStatus::AwaitingProducer {
|
||||
client_input: ClientInput {
|
||||
connection_command_sender: client_connection_tx,
|
||||
input_sender,
|
||||
input_sender: PollSender::new(input_sender),
|
||||
},
|
||||
},
|
||||
client_output: ClientOutputStatus::AwaitingConsumer {
|
||||
|
||||
@@ -1,16 +1,26 @@
|
||||
// Copyright 2020-2023 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use nym_sphinx::addressing::clients::Recipient;
|
||||
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
|
||||
use nym_sphinx::forwarding::packet::MixPacket;
|
||||
use nym_sphinx::params::PacketType;
|
||||
use nym_task::connections::TransmissionLane;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::convert::TryInto;
|
||||
use tokio_util::{
|
||||
bytes::Buf,
|
||||
bytes::BytesMut,
|
||||
codec::{Decoder, Encoder},
|
||||
};
|
||||
|
||||
pub type InputMessageSender = tokio::sync::mpsc::Sender<InputMessage>;
|
||||
use crate::error::ClientCoreError;
|
||||
|
||||
pub type InputMessageSender = tokio_util::sync::PollSender<InputMessage>;
|
||||
pub type InputMessageReceiver = tokio::sync::mpsc::Receiver<InputMessage>;
|
||||
|
||||
#[derive(Debug)]
|
||||
const LENGHT_ENCODING_PREFIX_SIZE: usize = 4;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub enum InputMessage {
|
||||
/// Fire an already prepared mix packets into the network.
|
||||
/// No guarantees are made about it. For example no retransmssion
|
||||
@@ -64,6 +74,10 @@ pub enum InputMessage {
|
||||
}
|
||||
|
||||
impl InputMessage {
|
||||
pub fn simple(data: &[u8], recipient: Recipient) -> Self {
|
||||
InputMessage::new_regular(recipient, data.to_vec(), TransmissionLane::General, None)
|
||||
}
|
||||
|
||||
pub fn new_premade(
|
||||
msgs: Vec<MixPacket>,
|
||||
lane: TransmissionLane,
|
||||
@@ -197,4 +211,70 @@ impl InputMessage {
|
||||
InputMessage::MessageWrapper { message, .. } => message.lane(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn serialized_size(&self) -> u64 {
|
||||
bincode::serialized_size(self).expect("failed to get serialized InputMessage size")
|
||||
+ LENGHT_ENCODING_PREFIX_SIZE as u64
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Tests
|
||||
pub struct AdressedInputMessageCodec(pub Recipient);
|
||||
|
||||
impl Encoder<&[u8]> for AdressedInputMessageCodec {
|
||||
type Error = ClientCoreError;
|
||||
|
||||
fn encode(&mut self, item: &[u8], buf: &mut BytesMut) -> Result<(), Self::Error> {
|
||||
let mut codec = InputMessageCodec;
|
||||
let input_message = InputMessage::simple(item, self.0.clone());
|
||||
codec.encode(input_message, buf)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct InputMessageCodec;
|
||||
|
||||
impl Encoder<InputMessage> for InputMessageCodec {
|
||||
type Error = ClientCoreError;
|
||||
|
||||
fn encode(&mut self, item: InputMessage, buf: &mut BytesMut) -> Result<(), Self::Error> {
|
||||
let encoded = bincode::serialize(&item).expect("failed to serialize InputMessage");
|
||||
let encoded_len = encoded.len() as u32;
|
||||
let mut encoded_with_len = encoded_len.to_le_bytes().to_vec();
|
||||
encoded_with_len.extend(encoded);
|
||||
buf.reserve(encoded_with_len.len());
|
||||
buf.extend_from_slice(&encoded_with_len);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Decoder for InputMessageCodec {
|
||||
type Item = InputMessage;
|
||||
type Error = ClientCoreError;
|
||||
|
||||
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||
if buf.len() < LENGHT_ENCODING_PREFIX_SIZE {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let len = u32::from_le_bytes(
|
||||
buf[0..LENGHT_ENCODING_PREFIX_SIZE]
|
||||
.try_into()
|
||||
.expect("Could not coarce to array"),
|
||||
) as usize;
|
||||
if buf.len() < len + LENGHT_ENCODING_PREFIX_SIZE {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let decoded = match bincode::deserialize(
|
||||
&buf[LENGHT_ENCODING_PREFIX_SIZE..len + LENGHT_ENCODING_PREFIX_SIZE],
|
||||
) {
|
||||
Ok(decoded) => decoded,
|
||||
Err(_) => return Ok(None),
|
||||
};
|
||||
|
||||
buf.advance(len + LENGHT_ENCODING_PREFIX_SIZE);
|
||||
|
||||
Ok(Some(decoded))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,9 @@ log = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
rand_distr = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
bincode = { workspace = true }
|
||||
tokio-util = { workspace = true, features = ["codec"] }
|
||||
|
||||
nym-sphinx-acknowledgements = { path = "acknowledgements" }
|
||||
nym-sphinx-addressing = { path = "addressing" }
|
||||
@@ -30,7 +33,9 @@ nym-topology = { path = "../topology" }
|
||||
|
||||
[dev-dependencies]
|
||||
nym-mixnet-contract-common = { path = "../cosmwasm-smart-contracts/mixnet-contract" }
|
||||
nym-crypto = { path = "../crypto", version = "0.4.0", features = ["asymmetric"] }
|
||||
nym-crypto = { path = "../crypto", version = "0.4.0", features = [
|
||||
"asymmetric",
|
||||
] }
|
||||
|
||||
# do not include this when compiling into wasm as it somehow when combined together with reqwest, it will require
|
||||
# net2 via tokio-util -> tokio -> mio -> net2
|
||||
@@ -43,5 +48,13 @@ features = ["sync"]
|
||||
|
||||
[features]
|
||||
default = ["sphinx"]
|
||||
sphinx = ["nym-crypto/sphinx", "nym-sphinx-params/sphinx", "nym-sphinx-types/sphinx"]
|
||||
outfox = ["nym-crypto/outfox", "nym-sphinx-params/outfox", "nym-sphinx-types/outfox"]
|
||||
sphinx = [
|
||||
"nym-crypto/sphinx",
|
||||
"nym-sphinx-params/sphinx",
|
||||
"nym-sphinx-types/sphinx",
|
||||
]
|
||||
outfox = [
|
||||
"nym-crypto/outfox",
|
||||
"nym-sphinx-params/outfox",
|
||||
"nym-sphinx-types/outfox",
|
||||
]
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
use crate::{ReplySurb, ReplySurbError};
|
||||
use nym_sphinx_addressing::clients::{Recipient, RecipientFormattingError};
|
||||
use rand::{CryptoRng, RngCore};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::mem;
|
||||
use thiserror::Error;
|
||||
@@ -24,7 +25,7 @@ pub enum InvalidAnonymousSenderTagRepresentation {
|
||||
InvalidLength { received: usize, expected: usize },
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
|
||||
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
|
||||
#[cfg_attr(target_arch = "wasm32", wasm_bindgen)]
|
||||
pub struct AnonymousSenderTag([u8; SENDER_TAG_SIZE]);
|
||||
|
||||
|
||||
@@ -13,3 +13,4 @@ nym-sphinx-params = { path = "../params" }
|
||||
nym-sphinx-types = { path = "../types", features = ["sphinx", "outfox"] }
|
||||
nym-outfox = { path = "../../../nym-outfox" }
|
||||
thiserror = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
@@ -4,6 +4,10 @@
|
||||
use nym_sphinx_addressing::nodes::{NymNodeRoutingAddress, NymNodeRoutingAddressError};
|
||||
use nym_sphinx_params::{PacketSize, PacketType};
|
||||
use nym_sphinx_types::{NymPacket, NymPacketError};
|
||||
use serde::{
|
||||
de::{self, Visitor},
|
||||
Deserialize, Deserializer, Serialize, Serializer,
|
||||
};
|
||||
|
||||
use std::fmt::{self, Debug, Formatter};
|
||||
use thiserror::Error;
|
||||
@@ -110,6 +114,39 @@ impl MixPacket {
|
||||
.chain(self.packet.to_bytes()?)
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub fn to_bytes(&self) -> Result<Vec<u8>, MixPacketFormattingError> {
|
||||
Ok(std::iter::once(self.packet_type as u8)
|
||||
.chain(self.next_hop.as_bytes())
|
||||
.chain(self.packet.to_bytes()?)
|
||||
.collect())
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for MixPacket {
|
||||
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
|
||||
serializer.serialize_bytes(&self.to_bytes().map_err(serde::ser::Error::custom)?)
|
||||
}
|
||||
}
|
||||
|
||||
struct MixPacketVisitor;
|
||||
|
||||
impl<'de> Visitor<'de> for MixPacketVisitor {
|
||||
type Value = MixPacket;
|
||||
|
||||
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
|
||||
formatter.write_str("a byte array representing a mix packet")
|
||||
}
|
||||
|
||||
fn visit_bytes<E: de::Error>(self, v: &[u8]) -> Result<Self::Value, E> {
|
||||
MixPacket::try_from_bytes(v).map_err(serde::de::Error::custom)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for MixPacket {
|
||||
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
|
||||
deserializer.deserialize_bytes(MixPacketVisitor)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: test for serialization and errors!
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use std::io;
|
||||
|
||||
use crate::message::{NymMessage, NymMessageError, PaddedMessage, PlainMessage};
|
||||
use log::debug;
|
||||
use nym_crypto::aes::cipher::{KeyIvInit, StreamCipher};
|
||||
use nym_crypto::asymmetric::encryption;
|
||||
use nym_crypto::shared_key::recompute_shared_key;
|
||||
@@ -16,10 +19,13 @@ use nym_sphinx_params::{
|
||||
PacketEncryptionAlgorithm, PacketHkdfAlgorithm, ReplySurbEncryptionAlgorithm,
|
||||
DEFAULT_NUM_MIX_HOPS,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use thiserror::Error;
|
||||
use tokio_util::bytes::{Buf, BytesMut};
|
||||
use tokio_util::codec::{Decoder, Encoder};
|
||||
|
||||
// TODO: should this live in this file?
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct ReconstructedMessage {
|
||||
/// The actual plaintext message that was received.
|
||||
pub message: Vec<u8>,
|
||||
@@ -57,6 +63,62 @@ impl From<PlainMessage> for ReconstructedMessage {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ReconstructedMessageCodec;
|
||||
const LENGHT_ENCODING_PREFIX_SIZE: usize = 4;
|
||||
|
||||
impl Encoder<ReconstructedMessage> for ReconstructedMessageCodec {
|
||||
type Error = MessageRecoveryError;
|
||||
|
||||
fn encode(
|
||||
&mut self,
|
||||
item: ReconstructedMessage,
|
||||
buf: &mut BytesMut,
|
||||
) -> Result<(), Self::Error> {
|
||||
let encoded = bincode::serialize(&item).expect("failed to serialize ReconstructedMessage");
|
||||
let encoded_len = encoded.len() as u32;
|
||||
let mut encoded_with_len = encoded_len.to_le_bytes().to_vec();
|
||||
encoded_with_len.extend(encoded);
|
||||
buf.reserve(encoded_with_len.len());
|
||||
buf.extend_from_slice(&encoded_with_len);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Decoder for ReconstructedMessageCodec {
|
||||
type Item = ReconstructedMessage;
|
||||
type Error = MessageRecoveryError;
|
||||
|
||||
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||
if buf.len() < LENGHT_ENCODING_PREFIX_SIZE {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let len = u32::from_le_bytes(
|
||||
buf[0..LENGHT_ENCODING_PREFIX_SIZE]
|
||||
.try_into()
|
||||
.expect("We know that we have at least LENGHT_ENCODING_PREFIX_SIZE bytes in there"),
|
||||
) as usize;
|
||||
|
||||
if buf.len() < len + LENGHT_ENCODING_PREFIX_SIZE {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let decoded = match bincode::deserialize(
|
||||
&buf[LENGHT_ENCODING_PREFIX_SIZE..len + LENGHT_ENCODING_PREFIX_SIZE],
|
||||
) {
|
||||
Ok(decoded) => decoded,
|
||||
Err(e) => {
|
||||
debug!("Failed to decode the message - {:?}", e);
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
|
||||
buf.advance(len + LENGHT_ENCODING_PREFIX_SIZE);
|
||||
|
||||
Ok(Some(decoded))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum MessageRecoveryError {
|
||||
#[error("The received message did not contain enough bytes to recover the ephemeral public key. Got {provided}. required: {required}")]
|
||||
@@ -74,6 +136,9 @@ pub enum MessageRecoveryError {
|
||||
|
||||
#[error("Failed to recover message fragment - {0}")]
|
||||
FragmentRecoveryError(#[from] ChunkingError),
|
||||
|
||||
#[error("Failed to recover message fragment - {0}")]
|
||||
MessageRecoveryError(#[from] io::Error),
|
||||
}
|
||||
|
||||
pub trait MessageReceiver {
|
||||
|
||||
@@ -19,6 +19,7 @@ serde = { workspace = true, features = ["derive"] } # for config serialization/d
|
||||
tap = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true, features = ["rt-multi-thread", "net", "signal"] }
|
||||
tokio-util = { workspace = true }
|
||||
url = { workspace = true }
|
||||
|
||||
nym-bandwidth-controller = { path = "../../common/bandwidth-controller" }
|
||||
|
||||
@@ -7,6 +7,7 @@ use super::{SocksVersion, RESERVED, SOCKS4_VERSION, SOCKS5_VERSION};
|
||||
use crate::config;
|
||||
use futures::channel::mpsc;
|
||||
use futures::task::{Context, Poll};
|
||||
use futures::SinkExt;
|
||||
use log::*;
|
||||
use nym_client_core::client::inbound_messages::{InputMessage, InputMessageSender};
|
||||
use nym_service_providers_common::interface::{ProviderInterfaceVersion, RequestVersion};
|
||||
|
||||
@@ -3,11 +3,12 @@
|
||||
|
||||
use crate::proxy_runner::MixProxySender;
|
||||
use bytes::Bytes;
|
||||
use futures::SinkExt;
|
||||
use log::{debug, error};
|
||||
use nym_socks5_requests::{ConnectionId, SocketData};
|
||||
use std::io;
|
||||
|
||||
pub(crate) struct OrderedMessageSender<F, S> {
|
||||
pub(crate) struct OrderedMessageSender<F, S: Send + 'static> {
|
||||
connection_id: ConnectionId,
|
||||
// addresses are provided for better logging
|
||||
local_destination_address: String,
|
||||
@@ -18,7 +19,7 @@ pub(crate) struct OrderedMessageSender<F, S> {
|
||||
mix_message_adapter: F,
|
||||
}
|
||||
|
||||
impl<F, S> OrderedMessageSender<F, S>
|
||||
impl<F, S: Send + 'static> OrderedMessageSender<F, S>
|
||||
where
|
||||
F: Fn(SocketData) -> S,
|
||||
{
|
||||
@@ -55,7 +56,7 @@ where
|
||||
(self.mix_message_adapter)(data)
|
||||
}
|
||||
|
||||
async fn send_message(&self, message: S) {
|
||||
async fn send_message(&mut self, message: S) {
|
||||
if self.mixnet_sender.send(message).await.is_err() {
|
||||
panic!("BatchRealMessageReceiver has stopped receiving!")
|
||||
}
|
||||
|
||||
@@ -74,7 +74,7 @@ async fn wait_for_lane(
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn run_inbound<F, S>(
|
||||
pub(super) async fn run_inbound<F, S: Send>(
|
||||
mut reader: OwnedReadHalf,
|
||||
mut message_sender: OrderedMessageSender<F, S>,
|
||||
connection_id: ConnectionId,
|
||||
|
||||
@@ -9,6 +9,7 @@ use nym_task::TaskClient;
|
||||
use std::fmt::Debug;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use tokio::{net::TcpStream, sync::Notify};
|
||||
use tokio_util::sync::PollSender;
|
||||
|
||||
mod inbound;
|
||||
mod outbound;
|
||||
@@ -35,7 +36,7 @@ impl From<(Vec<u8>, bool)> for ProxyMessage {
|
||||
}
|
||||
}
|
||||
|
||||
pub type MixProxySender<S> = tokio::sync::mpsc::Sender<S>;
|
||||
pub type MixProxySender<S> = PollSender<S>;
|
||||
pub type MixProxyReader<S> = tokio::sync::mpsc::Receiver<S>;
|
||||
|
||||
// TODO: when we finally get to implementing graceful shutdown,
|
||||
|
||||
@@ -19,7 +19,7 @@ pub trait StatisticsCollector {
|
||||
interval: Duration,
|
||||
timestamp: DateTime<Utc>,
|
||||
) -> StatsMessage;
|
||||
async fn send_stats_message(&self, stats_message: StatsMessage) -> Result<(), StatsError>;
|
||||
async fn send_stats_message(&mut self, stats_message: StatsMessage) -> Result<(), StatsError>;
|
||||
async fn reset_stats(&mut self);
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ futures = { workspace = true }
|
||||
log = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true, features = ["macros", "sync"] }
|
||||
serde = { workspace = true }
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio]
|
||||
workspace = true
|
||||
|
||||
@@ -2,11 +2,12 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use futures::channel::mpsc;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
|
||||
pub type ConnectionId = u64;
|
||||
|
||||
#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
|
||||
#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum TransmissionLane {
|
||||
General,
|
||||
// we need to treat surb-related requests and responses at higher priority
|
||||
|
||||
@@ -52,7 +52,7 @@ impl StatisticsCollector for GatewayStatisticsCollector {
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_stats_message(&self, stats_message: StatsMessage) -> Result<(), StatsError> {
|
||||
async fn send_stats_message(&mut self, stats_message: StatsMessage) -> Result<(), StatsError> {
|
||||
build_and_send_statistics_request(stats_message, self.statistics_service_url.to_string())
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -9,7 +9,10 @@ license.workspace = true
|
||||
[dependencies]
|
||||
async-trait = { workspace = true }
|
||||
bip39 = { workspace = true }
|
||||
nym-client-core = { path = "../../../common/client-core", features = ["fs-surb-storage", "fs-gateways-storage"] }
|
||||
nym-client-core = { path = "../../../common/client-core", features = [
|
||||
"fs-surb-storage",
|
||||
"fs-gateways-storage",
|
||||
] }
|
||||
nym-crypto = { path = "../../../common/crypto" }
|
||||
nym-gateway-requests = { path = "../../../gateway/gateway-requests" }
|
||||
nym-bandwidth-controller = { path = "../../../common/bandwidth-controller" }
|
||||
@@ -21,7 +24,9 @@ nym-sphinx = { path = "../../../common/nymsphinx" }
|
||||
nym-task = { path = "../../../common/task" }
|
||||
nym-topology = { path = "../../../common/topology" }
|
||||
nym-socks5-client-core = { path = "../../../common/socks5-client-core" }
|
||||
nym-validator-client = { path = "../../../common/client-libs/validator-client", features = ["http-client"] }
|
||||
nym-validator-client = { path = "../../../common/client-libs/validator-client", features = [
|
||||
"http-client",
|
||||
] }
|
||||
nym-socks5-requests = { path = "../../../common/socks5/requests" }
|
||||
nym-ordered-buffer = { path = "../../../common/socks5/ordered-buffer" }
|
||||
nym-service-providers-common = { path = "../../../service-providers/common" }
|
||||
@@ -37,6 +42,8 @@ tap = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
url = { workspace = true }
|
||||
toml = "0.5.10"
|
||||
tokio = { workspace = true }
|
||||
tokio-util = { workspace = true, features = ["codec"] }
|
||||
|
||||
[dev-dependencies]
|
||||
anyhow = { workspace = true }
|
||||
|
||||
@@ -16,7 +16,7 @@ async fn main() {
|
||||
let our_address = *client.nym_address();
|
||||
println!("Our client nym address is: {our_address}");
|
||||
|
||||
let sender = client.split_sender();
|
||||
let mut sender = client.split_sender();
|
||||
|
||||
// receiving task
|
||||
let receiving_task_handle = tokio::spawn(async move {
|
||||
|
||||
@@ -2,9 +2,11 @@ use crate::mixnet::client::MixnetClientBuilder;
|
||||
use crate::mixnet::traits::MixnetMessageSender;
|
||||
use crate::{Error, Result};
|
||||
use async_trait::async_trait;
|
||||
use futures::{ready, Stream, StreamExt};
|
||||
use bytes::{Buf as _, BytesMut};
|
||||
use futures::{ready, FutureExt, Sink, SinkExt, Stream, StreamExt};
|
||||
use log::error;
|
||||
use nym_client_core::client::base_client::GatewayConnection;
|
||||
use nym_client_core::client::inbound_messages::InputMessageCodec;
|
||||
use nym_client_core::client::{
|
||||
base_client::{ClientInput, ClientOutput, ClientState},
|
||||
inbound_messages::InputMessage,
|
||||
@@ -12,15 +14,18 @@ use nym_client_core::client::{
|
||||
};
|
||||
use nym_crypto::asymmetric::identity;
|
||||
use nym_sphinx::addressing::clients::Recipient;
|
||||
use nym_sphinx::receiver::ReconstructedMessageCodec;
|
||||
use nym_sphinx::{params::PacketType, receiver::ReconstructedMessage};
|
||||
use nym_task::{
|
||||
connections::{ConnectionCommandSender, LaneQueueLengths},
|
||||
TaskHandle,
|
||||
};
|
||||
use nym_topology::NymTopology;
|
||||
use std::pin::Pin;
|
||||
use std::pin::{pin, Pin};
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||
use tokio_util::codec::{Encoder, FramedRead};
|
||||
|
||||
/// Client connected to the Nym mixnet.
|
||||
pub struct MixnetClient {
|
||||
@@ -51,6 +56,24 @@ pub struct MixnetClient {
|
||||
|
||||
// internal state used for the `Stream` implementation
|
||||
_buffered: Vec<ReconstructedMessage>,
|
||||
|
||||
// internal state used for the `AsyncRead` implementation
|
||||
_read: ReadBuffer,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct ReadBuffer {
|
||||
buffer: BytesMut,
|
||||
}
|
||||
|
||||
impl ReadBuffer {
|
||||
fn clear(&mut self) {
|
||||
self.buffer.clear();
|
||||
}
|
||||
|
||||
fn pending(&self) -> bool {
|
||||
!self.buffer.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
impl MixnetClient {
|
||||
@@ -75,6 +98,7 @@ impl MixnetClient {
|
||||
task_handle,
|
||||
packet_type,
|
||||
_buffered: Vec::new(),
|
||||
_read: ReadBuffer::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -190,6 +214,25 @@ impl MixnetClient {
|
||||
// note: it's important to take ownership of the struct as if the shutdown is `TaskHandle::External`,
|
||||
// it must be dropped to finalize the shutdown
|
||||
}
|
||||
|
||||
fn read_buffer_to_slice(
|
||||
&mut self,
|
||||
buf: &mut ReadBuf,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<tokio::io::Result<()>> {
|
||||
if self._read.buffer.len() < buf.capacity() {
|
||||
// let written = self._read.buffer.len();
|
||||
buf.put_slice(&self._read.buffer);
|
||||
self._read.clear();
|
||||
Poll::Ready(Ok(()))
|
||||
} else {
|
||||
let written = buf.capacity();
|
||||
buf.put_slice(&self._read.buffer.split_off(written));
|
||||
self._read.buffer.advance(written);
|
||||
cx.waker().wake_by_ref();
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -198,6 +241,200 @@ pub struct MixnetClientSender {
|
||||
packet_type: Option<PacketType>,
|
||||
}
|
||||
|
||||
impl AsyncRead for MixnetClient {
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut ReadBuf,
|
||||
) -> Poll<tokio::io::Result<()>> {
|
||||
let mut codec = ReconstructedMessageCodec {};
|
||||
|
||||
if self._read.pending() {
|
||||
return self.read_buffer_to_slice(buf, cx);
|
||||
}
|
||||
|
||||
let msg = match self.as_mut().poll_next(cx) {
|
||||
Poll::Ready(Some(msg)) => msg,
|
||||
Poll::Ready(None) => return Poll::Ready(Ok(())),
|
||||
Poll::Pending => return Poll::Pending,
|
||||
};
|
||||
|
||||
match codec.encode(msg, &mut self._read.buffer) {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("failed to encode reconstructed message: {:?}", e);
|
||||
return Poll::Ready(Err(tokio::io::Error::new(
|
||||
tokio::io::ErrorKind::Other,
|
||||
"failed to encode reconstructed message",
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
self.read_buffer_to_slice(buf, cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for MixnetClient {
|
||||
fn poll_write(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<Result<usize, std::io::Error>> {
|
||||
let codec = InputMessageCodec {};
|
||||
let mut reader = FramedRead::new(buf, codec);
|
||||
let mut fut = reader.next();
|
||||
let msg = match fut.poll_unpin(cx) {
|
||||
Poll::Ready(Some(Ok(msg))) => msg,
|
||||
Poll::Ready(Some(Err(_))) => {
|
||||
return Poll::Ready(Err(std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
"failed to read message from input",
|
||||
)))
|
||||
}
|
||||
Poll::Pending => return Poll::Pending,
|
||||
Poll::Ready(None) => return Poll::Ready(Ok(0)),
|
||||
};
|
||||
|
||||
let msg_size = msg.serialized_size();
|
||||
|
||||
let mut fut = pin!(self.client_input.send(msg));
|
||||
match fut.poll_unpin(cx) {
|
||||
Poll::Ready(Ok(())) => Poll::Ready(Ok(msg_size as usize)),
|
||||
Poll::Ready(Err(_)) => Poll::Ready(Err(std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
"failed to send message to mixnet",
|
||||
))),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_flush(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<std::prelude::v1::Result<(), std::io::Error>> {
|
||||
Sink::poll_flush(self, cx)
|
||||
.map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "failed to flush the sink"))
|
||||
}
|
||||
|
||||
fn poll_shutdown(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<std::prelude::v1::Result<(), std::io::Error>> {
|
||||
AsyncWrite::poll_flush(self, cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl Sink<InputMessage> for MixnetClient {
|
||||
type Error = Error;
|
||||
|
||||
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
|
||||
match self.sender().poll_ready_unpin(cx) {
|
||||
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
|
||||
Poll::Ready(Err(_)) => Poll::Ready(Err(Error::MessageSendingFailure)),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
|
||||
fn start_send(mut self: Pin<&mut Self>, item: InputMessage) -> Result<()> {
|
||||
self.sender()
|
||||
.start_send_unpin(item)
|
||||
.map_err(|_| Error::MessageSendingFailure)
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
|
||||
self.sender()
|
||||
.poll_flush_unpin(cx)
|
||||
.map_err(|_| Error::MessageSendingFailure)
|
||||
}
|
||||
|
||||
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
|
||||
self.sender()
|
||||
.poll_close_unpin(cx)
|
||||
.map_err(|_| Error::MessageSendingFailure)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: there should be a better way of implementing Sink and AsyncWrite over T: MixnetMessageSender
|
||||
impl AsyncWrite for MixnetClientSender {
|
||||
fn poll_write(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<Result<usize, std::io::Error>> {
|
||||
let codec = InputMessageCodec {};
|
||||
let mut reader = FramedRead::new(buf, codec);
|
||||
let mut fut = reader.next();
|
||||
let msg = match fut.poll_unpin(cx) {
|
||||
Poll::Ready(Some(Ok(msg))) => msg,
|
||||
Poll::Ready(Some(Err(_))) => {
|
||||
return Poll::Ready(Err(std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
"failed to read message from input",
|
||||
)))
|
||||
}
|
||||
Poll::Pending => return Poll::Pending,
|
||||
Poll::Ready(None) => return Poll::Ready(Ok(0)),
|
||||
};
|
||||
|
||||
let msg_size = msg.serialized_size();
|
||||
|
||||
let mut fut = pin!(self.client_input.send(msg));
|
||||
match fut.poll_unpin(cx) {
|
||||
Poll::Ready(Ok(())) => Poll::Ready(Ok(msg_size as usize)),
|
||||
Poll::Ready(Err(_)) => Poll::Ready(Err(std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
"failed to send message to mixnet",
|
||||
))),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_flush(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<std::prelude::v1::Result<(), std::io::Error>> {
|
||||
Sink::poll_flush(self, cx)
|
||||
.map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "failed to flush the sink"))
|
||||
}
|
||||
|
||||
fn poll_shutdown(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<std::prelude::v1::Result<(), std::io::Error>> {
|
||||
AsyncWrite::poll_flush(self, cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl Sink<InputMessage> for MixnetClientSender {
|
||||
type Error = Error;
|
||||
|
||||
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
|
||||
match self.sender().poll_ready_unpin(cx) {
|
||||
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
|
||||
Poll::Ready(Err(_)) => Poll::Ready(Err(Error::MessageSendingFailure)),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
|
||||
fn start_send(mut self: Pin<&mut Self>, item: InputMessage) -> Result<()> {
|
||||
self.sender()
|
||||
.start_send_unpin(item)
|
||||
.map_err(|_| Error::MessageSendingFailure)
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
|
||||
self.sender()
|
||||
.poll_flush_unpin(cx)
|
||||
.map_err(|_| Error::MessageSendingFailure)
|
||||
}
|
||||
|
||||
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
|
||||
self.sender()
|
||||
.poll_close_unpin(cx)
|
||||
.map_err(|_| Error::MessageSendingFailure)
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for MixnetClient {
|
||||
type Item = ReconstructedMessage;
|
||||
|
||||
@@ -234,12 +471,16 @@ impl MixnetMessageSender for MixnetClient {
|
||||
self.packet_type
|
||||
}
|
||||
|
||||
async fn send(&self, message: InputMessage) -> Result<()> {
|
||||
async fn send(&mut self, message: InputMessage) -> Result<()> {
|
||||
self.client_input
|
||||
.send(message)
|
||||
.await
|
||||
.map_err(|_| Error::MessageSendingFailure)
|
||||
}
|
||||
|
||||
fn sender(&mut self) -> &mut tokio_util::sync::PollSender<InputMessage> {
|
||||
&mut self.client_input.input_sender
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -248,10 +489,14 @@ impl MixnetMessageSender for MixnetClientSender {
|
||||
self.packet_type
|
||||
}
|
||||
|
||||
async fn send(&self, message: InputMessage) -> Result<()> {
|
||||
async fn send(&mut self, message: InputMessage) -> Result<()> {
|
||||
self.client_input
|
||||
.send(message)
|
||||
.await
|
||||
.map_err(|_| Error::MessageSendingFailure)
|
||||
}
|
||||
|
||||
fn sender(&mut self) -> &mut tokio_util::sync::PollSender<InputMessage> {
|
||||
&mut self.client_input.input_sender
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,9 +16,11 @@ pub trait MixnetMessageSender {
|
||||
None
|
||||
}
|
||||
|
||||
fn sender(&mut self) -> &mut tokio_util::sync::PollSender<InputMessage>;
|
||||
|
||||
/// Sends a [`InputMessage`] to the mixnet. This is the most low-level sending function, for
|
||||
/// full customization.
|
||||
async fn send(&self, message: InputMessage) -> Result<()>;
|
||||
async fn send(&mut self, message: InputMessage) -> Result<()>;
|
||||
|
||||
/// Sends data to the supplied Nym address with the default surb behaviour.
|
||||
///
|
||||
@@ -35,7 +37,7 @@ pub trait MixnetMessageSender {
|
||||
/// client.send_plain_message(recipient, "hi").await.unwrap();
|
||||
/// }
|
||||
/// ```
|
||||
async fn send_plain_message<M>(&self, address: Recipient, message: M) -> Result<()>
|
||||
async fn send_plain_message<M>(&mut self, address: Recipient, message: M) -> Result<()>
|
||||
where
|
||||
M: AsRef<[u8]> + Send,
|
||||
{
|
||||
@@ -61,7 +63,7 @@ pub trait MixnetMessageSender {
|
||||
/// }
|
||||
/// ```
|
||||
async fn send_message<M>(
|
||||
&self,
|
||||
&mut self,
|
||||
address: Recipient,
|
||||
message: M,
|
||||
surbs: IncludedSurbs,
|
||||
@@ -103,7 +105,7 @@ pub trait MixnetMessageSender {
|
||||
/// client.send_reply(tag, b"hi").await.unwrap();
|
||||
/// }
|
||||
/// ```
|
||||
async fn send_reply<M>(&self, recipient_tag: AnonymousSenderTag, message: M) -> Result<()>
|
||||
async fn send_reply<M>(&mut self, recipient_tag: AnonymousSenderTag, message: M) -> Result<()>
|
||||
where
|
||||
M: AsRef<[u8]> + Send,
|
||||
{
|
||||
|
||||
@@ -605,7 +605,7 @@ impl MixnetListener {
|
||||
|
||||
// When an incoming mixnet message triggers a response that we send back, such as during
|
||||
// connect handshake.
|
||||
async fn handle_response(&self, response: IpPacketResponse) -> Result<()> {
|
||||
async fn handle_response(&mut self, response: IpPacketResponse) -> Result<()> {
|
||||
let Some(recipient) = response.recipient() else {
|
||||
log::error!("No recipient in response packet, this should NOT happen!");
|
||||
return Err(IpPacketRouterError::NoRecipientInResponse);
|
||||
@@ -635,7 +635,7 @@ impl MixnetListener {
|
||||
|
||||
// A single incoming request can trigger multiple responses, such as when data requests contain
|
||||
// multiple IP packets.
|
||||
async fn handle_responses(&self, responses: Vec<PacketHandleResult>) {
|
||||
async fn handle_responses(&mut self, responses: Vec<PacketHandleResult>) {
|
||||
for response in responses {
|
||||
match response {
|
||||
Ok(Some(response)) => {
|
||||
|
||||
@@ -20,7 +20,7 @@ anyhow = { workspace = true }
|
||||
addr = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
bs58 = { workspace = true }
|
||||
clap = { workspace = true, features = ["cargo", "derive"]}
|
||||
clap = { workspace = true, features = ["cargo", "derive"] }
|
||||
dirs = "4.0"
|
||||
futures = { workspace = true }
|
||||
humantime-serde = { workspace = true }
|
||||
@@ -33,19 +33,26 @@ regex = { workspace = true }
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "chrono"]}
|
||||
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "chrono"] }
|
||||
tap = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true, features = [ "net", "rt-multi-thread", "macros" ] }
|
||||
tokio = { workspace = true, features = ["net", "rt-multi-thread", "macros"] }
|
||||
tokio-tungstenite = { workspace = true }
|
||||
tokio-util = { workspace = true }
|
||||
url = { workspace = true }
|
||||
time = { workspace = true }
|
||||
zeroize = { workspace = true }
|
||||
|
||||
# internal
|
||||
nym-async-file-watcher = { path = "../../common/async-file-watcher" }
|
||||
nym-bin-common = { path = "../../common/bin-common", features = ["output_format"] }
|
||||
nym-client-core = { path = "../../common/client-core", features = ["cli", "fs-gateways-storage", "fs-surb-storage"] }
|
||||
nym-bin-common = { path = "../../common/bin-common", features = [
|
||||
"output_format",
|
||||
] }
|
||||
nym-client-core = { path = "../../common/client-core", features = [
|
||||
"cli",
|
||||
"fs-gateways-storage",
|
||||
"fs-surb-storage",
|
||||
] }
|
||||
nym-client-websocket-requests = { path = "../../clients/native/websocket-requests" }
|
||||
nym-config = { path = "../../common/config" }
|
||||
nym-credentials = { path = "../../common/credentials" }
|
||||
|
||||
@@ -10,13 +10,14 @@ use crate::{reply, socks5};
|
||||
use async_trait::async_trait;
|
||||
use futures::channel::{mpsc, oneshot};
|
||||
use futures::stream::StreamExt;
|
||||
use futures::SinkExt;
|
||||
use log::{debug, warn};
|
||||
use nym_bin_common::bin_info_owned;
|
||||
use nym_client_core::client::mix_traffic::transceiver::GatewayTransceiver;
|
||||
use nym_client_core::config::disk_persistence::CommonClientPaths;
|
||||
use nym_client_core::HardcodedTopologyProvider;
|
||||
use nym_network_defaults::NymNetworkDetails;
|
||||
use nym_sdk::mixnet::{MixnetMessageSender, TopologyProvider};
|
||||
use nym_sdk::mixnet::TopologyProvider;
|
||||
use nym_service_providers_common::interface::{
|
||||
BinaryInformation, ProviderInterfaceVersion, Request, RequestVersion,
|
||||
};
|
||||
@@ -40,6 +41,7 @@ use nym_task::manager::TaskHandle;
|
||||
use nym_task::TaskClient;
|
||||
use std::path::Path;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use tokio_util::sync::PollSender;
|
||||
|
||||
// Since it's an atomic, it's safe to be kept static and shared across threads
|
||||
static ACTIVE_PROXIES: AtomicUsize = AtomicUsize::new(0);
|
||||
@@ -284,6 +286,8 @@ impl NRServiceProviderBuilder {
|
||||
// going to be used by `mixnet_response_listener`
|
||||
let (mix_input_sender, mix_input_receiver) = tokio::sync::mpsc::channel::<MixnetMessage>(1);
|
||||
|
||||
let mix_input_sender = PollSender::new(mix_input_sender);
|
||||
|
||||
// Controller for managing all active connections.
|
||||
let (mut active_connections_controller, controller_sender) = Controller::new(
|
||||
mixnet_client.connection_command_sender(),
|
||||
@@ -400,7 +404,7 @@ impl NRServiceProvider {
|
||||
/// Listens for any messages from `mix_reader` that should be written back to the mix network
|
||||
/// via the `websocket_writer`.
|
||||
async fn mixnet_response_listener(
|
||||
mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender,
|
||||
mut mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender,
|
||||
mut mix_input_reader: MixProxyReader<MixnetMessage>,
|
||||
stats_collector: Option<ServiceStatisticsCollector>,
|
||||
packet_type: PacketType,
|
||||
@@ -425,7 +429,7 @@ impl NRServiceProvider {
|
||||
}
|
||||
|
||||
let response_message = msg.into_input_message(packet_type);
|
||||
mixnet_client_sender.send(response_message).await.unwrap();
|
||||
nym_sdk::mixnet::MixnetMessageSender::send(&mut mixnet_client_sender, response_message).await.unwrap();
|
||||
} else {
|
||||
log::error!("Exiting: channel closed!");
|
||||
break;
|
||||
@@ -443,7 +447,7 @@ impl NRServiceProvider {
|
||||
return_address: reply::MixnetAddress,
|
||||
biggest_packet_size: PacketSize,
|
||||
controller_sender: ControllerSender,
|
||||
mix_input_sender: MixProxySender<MixnetMessage>,
|
||||
mut mix_input_sender: MixProxySender<MixnetMessage>,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
mut shutdown: TaskClient,
|
||||
) {
|
||||
@@ -537,7 +541,7 @@ impl NRServiceProvider {
|
||||
.unwrap_or(traffic_config.primary_packet_size);
|
||||
|
||||
let controller_sender_clone = self.controller_sender.clone();
|
||||
let mix_input_sender_clone = self.mix_input_sender.clone();
|
||||
let mut mix_input_sender_clone = self.mix_input_sender.clone();
|
||||
let lane_queue_lengths_clone = self.mixnet_client.shared_lane_queue_lengths();
|
||||
let mut shutdown = self.shutdown.get_handle();
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ use super::error::StatsError;
|
||||
use crate::core::new_legacy_request_version;
|
||||
use crate::reply::MixnetMessage;
|
||||
use async_trait::async_trait;
|
||||
use futures::SinkExt;
|
||||
use log::*;
|
||||
use nym_service_providers_common::interface::RequestVersion;
|
||||
use nym_socks5_proxy_helpers::proxy_runner::MixProxySender;
|
||||
@@ -165,7 +166,7 @@ impl StatisticsCollector for ServiceStatisticsCollector {
|
||||
}
|
||||
|
||||
async fn send_stats_message(
|
||||
&self,
|
||||
&mut self,
|
||||
stats_message: StatsMessage,
|
||||
) -> Result<(), CommonStatsError> {
|
||||
let msg = build_statistics_request_bytes(stats_message)?;
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
[package]
|
||||
name = "nym-client-wasm"
|
||||
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>", "Jedrzej Stuczynski <andrew@nymtech.net>"]
|
||||
authors = [
|
||||
"Dave Hrycyszyn <futurechimp@users.noreply.github.com>",
|
||||
"Jedrzej Stuczynski <andrew@nymtech.net>",
|
||||
]
|
||||
version = "1.3.0-rc.0"
|
||||
edition = "2021"
|
||||
keywords = ["nym", "sphinx", "wasm", "webassembly", "privacy", "client"]
|
||||
@@ -22,15 +25,16 @@ serde_json = { workspace = true }
|
||||
serde-wasm-bindgen = { workspace = true }
|
||||
wasm-bindgen = { workspace = true }
|
||||
wasm-bindgen-futures = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tsify = { workspace = true, features = ["js"] }
|
||||
tokio = { workspace = true, default-features = false, features = ["sync"] }
|
||||
|
||||
nym-bin-common = { path = "../../common/bin-common" }
|
||||
wasm-client-core = { path = "../../common/wasm/client-core" }
|
||||
wasm-utils = { path = "../../common/wasm/utils" }
|
||||
|
||||
nym-node-tester-utils = { path = "../../common/node-tester-utils", optional = true }
|
||||
nym-node-tester-wasm = { path = "../node-tester", optional = true}
|
||||
nym-node-tester-wasm = { path = "../node-tester", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
wasm-bindgen-test = { workspace = true }
|
||||
|
||||
@@ -14,6 +14,7 @@ use crate::response_pusher::ResponsePusher;
|
||||
use js_sys::Promise;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use tsify::Tsify;
|
||||
use wasm_bindgen::prelude::*;
|
||||
use wasm_bindgen_futures::future_to_promise;
|
||||
@@ -50,7 +51,7 @@ pub(crate) const NODE_TESTER_CLIENT_ID: &str = "_nym-node-tester-client";
|
||||
#[wasm_bindgen]
|
||||
pub struct NymClient {
|
||||
self_address: String,
|
||||
client_input: Arc<ClientInput>,
|
||||
client_input: Arc<RwLock<ClientInput>>,
|
||||
client_state: Arc<ClientState>,
|
||||
|
||||
// keep track of the "old" topology for the purposes of node tester
|
||||
@@ -196,7 +197,7 @@ impl NymClientBuilder {
|
||||
|
||||
Ok(NymClient {
|
||||
self_address,
|
||||
client_input: Arc::new(client_input),
|
||||
client_input: Arc::new(RwLock::new(client_input)),
|
||||
client_state: Arc::new(started_client.client_state),
|
||||
_full_topology: None,
|
||||
// this cannot failed as we haven't passed an external task manager
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
// Copyright 2022-2023 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use futures::SinkExt;
|
||||
use js_sys::Promise;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use wasm_bindgen::JsValue;
|
||||
use wasm_bindgen_futures::future_to_promise;
|
||||
use wasm_client_core::client::base_client::{ClientInput, ClientState};
|
||||
@@ -48,10 +50,11 @@ pub(crate) trait InputSender {
|
||||
fn send_messages(&self, messages: Vec<InputMessage>) -> Promise;
|
||||
}
|
||||
|
||||
impl InputSender for Arc<ClientInput> {
|
||||
impl InputSender for Arc<RwLock<ClientInput>> {
|
||||
fn send_message(&self, message: InputMessage) -> Promise {
|
||||
let this = Arc::clone(self);
|
||||
future_to_promise(async move {
|
||||
let mut this = this.write().await;
|
||||
match this.input_sender.send(message).await {
|
||||
Ok(_) => Ok(JsValue::null()),
|
||||
Err(_) => Err(simple_js_error(
|
||||
@@ -64,6 +67,7 @@ impl InputSender for Arc<ClientInput> {
|
||||
fn send_messages(&self, messages: Vec<InputMessage>) -> Promise {
|
||||
let this = Arc::clone(self);
|
||||
future_to_promise(async move {
|
||||
let mut this = this.write().await;
|
||||
for message in messages {
|
||||
if this.input_sender.send(message).await.is_err() {
|
||||
return Err(simple_js_error(
|
||||
|
||||
@@ -24,7 +24,7 @@ tokio = { workspace = true, features = ["sync"] }
|
||||
url = { workspace = true }
|
||||
wasm-bindgen = { workspace = true }
|
||||
wasm-bindgen-futures = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tsify = { workspace = true, features = ["js"] }
|
||||
|
||||
nym-bin-common = { path = "../../common/bin-common" }
|
||||
|
||||
@@ -8,10 +8,13 @@ use crate::go_bridge::goWasmSetMixFetchRequestTimeout;
|
||||
use crate::request_writer::RequestWriter;
|
||||
use crate::socks_helpers::{socks5_connect_request, socks5_data_request};
|
||||
use crate::{config, RequestId};
|
||||
use futures::SinkExt;
|
||||
use js_sys::Promise;
|
||||
use nym_socks5_requests::RemoteAddress;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::RwLock;
|
||||
use wasm_bindgen::prelude::*;
|
||||
use wasm_bindgen_futures::future_to_promise;
|
||||
use wasm_client_core::client::base_client::{BaseClientBuilder, ClientInput, ClientOutput};
|
||||
@@ -34,7 +37,7 @@ pub struct MixFetchClient {
|
||||
|
||||
self_address: Recipient,
|
||||
|
||||
client_input: ClientInput,
|
||||
client_input: Arc<RwLock<ClientInput>>,
|
||||
|
||||
requests: ActiveRequests,
|
||||
|
||||
@@ -131,7 +134,7 @@ impl MixFetchClientBuilder {
|
||||
invalidated: AtomicBool::new(false),
|
||||
mix_fetch_config: self.config.mix_fetch,
|
||||
self_address,
|
||||
client_input,
|
||||
client_input: Arc::new(RwLock::new(client_input)),
|
||||
requests: active_requests,
|
||||
// this cannot failed as we haven't passed an external task manager
|
||||
_task_manager: Mutex::new(started_client.task_handle.try_into_task_manager().unwrap()),
|
||||
@@ -208,6 +211,8 @@ impl MixFetchClient {
|
||||
// the expect here is fine as it implies an unrecoverable failure since one of the client core
|
||||
// tasks has terminated
|
||||
self.client_input
|
||||
.write()
|
||||
.await
|
||||
.input_sender
|
||||
.send(input)
|
||||
.await
|
||||
@@ -235,6 +240,8 @@ impl MixFetchClient {
|
||||
// the expect here is fine as it implies an unrecoverable failure since one of the client core
|
||||
// tasks has terminated
|
||||
self.client_input
|
||||
.write()
|
||||
.await
|
||||
.input_sender
|
||||
.send(input)
|
||||
.await
|
||||
|
||||
Reference in New Issue
Block a user