Compare commits

...

14 Commits

Author SHA1 Message Date
durch 1b33d8e2c9 Address part of PR comments 2024-06-25 15:01:18 +02:00
durch 2fe282d774 fmt 2024-06-19 13:11:18 +02:00
durch abfd121d4e Log decoding error 2024-06-19 13:09:34 +02:00
durch 46ebb7039a Cleanup prints 2024-06-18 19:54:39 +02:00
durch bc28cc70b7 Update IPR sig 2024-06-18 11:58:03 +02:00
durch 36e8468b73 WASM changes 2024-06-18 10:30:56 +02:00
durch cefb217c25 AsyncWrite 2024-06-18 10:30:56 +02:00
durch 4adcd32ebf Use tokio AsyncRead 2024-06-18 10:30:56 +02:00
durch 7820b816b5 ReconstructedMessageCodec 2024-06-18 10:30:56 +02:00
Drazen f54cee786c InputMessageCodec, Serde for MixPacket 2024-06-18 10:30:56 +02:00
Drazen edd9fef468 Use Sink always 2024-06-18 10:30:56 +02:00
Drazen eed87ff4c9 Switch to PollSender 2024-06-18 10:30:56 +02:00
durch 575056ac67 AsyncRead for MixnetClient 2024-06-18 10:30:56 +02:00
durch 34e9822f1d serde for ReconstructedMessage 2024-06-18 10:30:56 +02:00
31 changed files with 553 additions and 55 deletions
Generated
+10
View File
@@ -4138,6 +4138,7 @@ version = "1.1.15"
dependencies = [
"async-trait",
"base64 0.21.7",
"bincode",
"bs58 0.5.1",
"cfg-if",
"clap 4.5.4",
@@ -4180,6 +4181,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-tungstenite",
"tokio-util",
"tungstenite",
"url",
"wasm-bindgen",
@@ -4253,6 +4255,7 @@ dependencies = [
"serde-wasm-bindgen 0.6.5",
"serde_json",
"thiserror",
"tokio",
"tsify",
"wasm-bindgen",
"wasm-bindgen-futures",
@@ -4956,6 +4959,7 @@ dependencies = [
"time",
"tokio",
"tokio-tungstenite",
"tokio-util",
"url",
"zeroize",
]
@@ -5300,6 +5304,7 @@ dependencies = [
"tap",
"thiserror",
"tokio",
"tokio-util",
"url",
]
@@ -5359,6 +5364,7 @@ dependencies = [
name = "nym-sphinx"
version = "0.1.0"
dependencies = [
"bincode",
"log",
"nym-crypto",
"nym-mixnet-contract-common",
@@ -5375,8 +5381,10 @@ dependencies = [
"nym-topology",
"rand 0.8.5",
"rand_distr",
"serde",
"thiserror",
"tokio",
"tokio-util",
]
[[package]]
@@ -5463,6 +5471,7 @@ dependencies = [
"nym-sphinx-addressing",
"nym-sphinx-params",
"nym-sphinx-types",
"serde",
"thiserror",
]
@@ -5540,6 +5549,7 @@ version = "0.1.0"
dependencies = [
"futures",
"log",
"serde",
"thiserror",
"tokio",
"wasm-bindgen",
+5 -1
View File
@@ -10,6 +10,7 @@ license.workspace = true
[dependencies]
async-trait = { workspace = true }
bincode = { workspace = true }
base64 = "0.21.2"
bs58 = { workspace = true }
cfg-if = { workspace = true }
@@ -26,6 +27,7 @@ tap = { workspace = true }
thiserror = { workspace = true }
url = { workspace = true, features = ["serde"] }
tokio = { workspace = true, features = ["macros"] }
tokio-util = { workspace = true, features = ["codec"] }
time = { workspace = true }
zeroize = { workspace = true }
@@ -47,7 +49,9 @@ nym-validator-client = { path = "../client-libs/validator-client", default-featu
nym-task = { path = "../task" }
nym-credential-storage = { path = "../credential-storage" }
nym-network-defaults = { path = "../network-defaults" }
nym-client-core-config-types = { path = "./config-types", features = ["disk-persistence"] }
nym-client-core-config-types = { path = "./config-types", features = [
"disk-persistence",
] }
nym-client-core-surb-storage = { path = "./surb-storage" }
nym-client-core-gateways-storage = { path = "./gateways-storage" }
@@ -35,6 +35,7 @@ use crate::init::{
};
use crate::{config, spawn_future};
use futures::channel::mpsc;
use futures::SinkExt;
use log::{debug, error, info, warn};
use nym_bandwidth_controller::BandwidthController;
use nym_client_core_gateways_storage::{GatewayDetails, GatewaysDetailsStore};
@@ -59,6 +60,7 @@ use std::fmt::Debug;
use std::os::raw::c_int as RawFd;
use std::path::Path;
use std::sync::Arc;
use tokio_util::sync::{PollSendError, PollSender};
use url::Url;
#[cfg(all(
@@ -78,10 +80,7 @@ pub struct ClientInput {
}
impl ClientInput {
pub async fn send(
&self,
message: InputMessage,
) -> Result<(), tokio::sync::mpsc::error::SendError<InputMessage>> {
pub async fn send(&mut self, message: InputMessage) -> Result<(), PollSendError<InputMessage>> {
self.input_sender.send(message).await
}
}
@@ -804,7 +803,7 @@ where
client_input: ClientInputStatus::AwaitingProducer {
client_input: ClientInput {
connection_command_sender: client_connection_tx,
input_sender,
input_sender: PollSender::new(input_sender),
},
},
client_output: ClientOutputStatus::AwaitingConsumer {
@@ -1,16 +1,26 @@
// Copyright 2020-2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::params::PacketType;
use nym_task::connections::TransmissionLane;
use serde::{Deserialize, Serialize};
use std::convert::TryInto;
use tokio_util::{
bytes::Buf,
bytes::BytesMut,
codec::{Decoder, Encoder},
};
pub type InputMessageSender = tokio::sync::mpsc::Sender<InputMessage>;
use crate::error::ClientCoreError;
pub type InputMessageSender = tokio_util::sync::PollSender<InputMessage>;
pub type InputMessageReceiver = tokio::sync::mpsc::Receiver<InputMessage>;
#[derive(Debug)]
const LENGHT_ENCODING_PREFIX_SIZE: usize = 4;
#[derive(Serialize, Deserialize, Debug)]
pub enum InputMessage {
/// Fire an already prepared mix packets into the network.
/// No guarantees are made about it. For example no retransmssion
@@ -64,6 +74,10 @@ pub enum InputMessage {
}
impl InputMessage {
pub fn simple(data: &[u8], recipient: Recipient) -> Self {
InputMessage::new_regular(recipient, data.to_vec(), TransmissionLane::General, None)
}
pub fn new_premade(
msgs: Vec<MixPacket>,
lane: TransmissionLane,
@@ -197,4 +211,70 @@ impl InputMessage {
InputMessage::MessageWrapper { message, .. } => message.lane(),
}
}
pub fn serialized_size(&self) -> u64 {
bincode::serialized_size(self).expect("failed to get serialized InputMessage size")
+ LENGHT_ENCODING_PREFIX_SIZE as u64
}
}
// TODO: Tests
pub struct AdressedInputMessageCodec(pub Recipient);
impl Encoder<&[u8]> for AdressedInputMessageCodec {
type Error = ClientCoreError;
fn encode(&mut self, item: &[u8], buf: &mut BytesMut) -> Result<(), Self::Error> {
let mut codec = InputMessageCodec;
let input_message = InputMessage::simple(item, self.0.clone());
codec.encode(input_message, buf)?;
Ok(())
}
}
pub struct InputMessageCodec;
impl Encoder<InputMessage> for InputMessageCodec {
type Error = ClientCoreError;
fn encode(&mut self, item: InputMessage, buf: &mut BytesMut) -> Result<(), Self::Error> {
let encoded = bincode::serialize(&item).expect("failed to serialize InputMessage");
let encoded_len = encoded.len() as u32;
let mut encoded_with_len = encoded_len.to_le_bytes().to_vec();
encoded_with_len.extend(encoded);
buf.reserve(encoded_with_len.len());
buf.extend_from_slice(&encoded_with_len);
Ok(())
}
}
impl Decoder for InputMessageCodec {
type Item = InputMessage;
type Error = ClientCoreError;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if buf.len() < LENGHT_ENCODING_PREFIX_SIZE {
return Ok(None);
}
let len = u32::from_le_bytes(
buf[0..LENGHT_ENCODING_PREFIX_SIZE]
.try_into()
.expect("Could not coarce to array"),
) as usize;
if buf.len() < len + LENGHT_ENCODING_PREFIX_SIZE {
return Ok(None);
}
let decoded = match bincode::deserialize(
&buf[LENGHT_ENCODING_PREFIX_SIZE..len + LENGHT_ENCODING_PREFIX_SIZE],
) {
Ok(decoded) => decoded,
Err(_) => return Ok(None),
};
buf.advance(len + LENGHT_ENCODING_PREFIX_SIZE);
Ok(Some(decoded))
}
}
+16 -3
View File
@@ -12,6 +12,9 @@ log = { workspace = true }
rand = { workspace = true }
rand_distr = { workspace = true }
thiserror = { workspace = true }
serde = { workspace = true }
bincode = { workspace = true }
tokio-util = { workspace = true, features = ["codec"] }
nym-sphinx-acknowledgements = { path = "acknowledgements" }
nym-sphinx-addressing = { path = "addressing" }
@@ -30,7 +33,9 @@ nym-topology = { path = "../topology" }
[dev-dependencies]
nym-mixnet-contract-common = { path = "../cosmwasm-smart-contracts/mixnet-contract" }
nym-crypto = { path = "../crypto", version = "0.4.0", features = ["asymmetric"] }
nym-crypto = { path = "../crypto", version = "0.4.0", features = [
"asymmetric",
] }
# do not include this when compiling into wasm as it somehow when combined together with reqwest, it will require
# net2 via tokio-util -> tokio -> mio -> net2
@@ -43,5 +48,13 @@ features = ["sync"]
[features]
default = ["sphinx"]
sphinx = ["nym-crypto/sphinx", "nym-sphinx-params/sphinx", "nym-sphinx-types/sphinx"]
outfox = ["nym-crypto/outfox", "nym-sphinx-params/outfox", "nym-sphinx-types/outfox"]
sphinx = [
"nym-crypto/sphinx",
"nym-sphinx-params/sphinx",
"nym-sphinx-types/sphinx",
]
outfox = [
"nym-crypto/outfox",
"nym-sphinx-params/outfox",
"nym-sphinx-types/outfox",
]
@@ -4,6 +4,7 @@
use crate::{ReplySurb, ReplySurbError};
use nym_sphinx_addressing::clients::{Recipient, RecipientFormattingError};
use rand::{CryptoRng, RngCore};
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
use std::mem;
use thiserror::Error;
@@ -24,7 +25,7 @@ pub enum InvalidAnonymousSenderTagRepresentation {
InvalidLength { received: usize, expected: usize },
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
#[cfg_attr(target_arch = "wasm32", wasm_bindgen)]
pub struct AnonymousSenderTag([u8; SENDER_TAG_SIZE]);
+1
View File
@@ -13,3 +13,4 @@ nym-sphinx-params = { path = "../params" }
nym-sphinx-types = { path = "../types", features = ["sphinx", "outfox"] }
nym-outfox = { path = "../../../nym-outfox" }
thiserror = { workspace = true }
serde = { workspace = true }
+37
View File
@@ -4,6 +4,10 @@
use nym_sphinx_addressing::nodes::{NymNodeRoutingAddress, NymNodeRoutingAddressError};
use nym_sphinx_params::{PacketSize, PacketType};
use nym_sphinx_types::{NymPacket, NymPacketError};
use serde::{
de::{self, Visitor},
Deserialize, Deserializer, Serialize, Serializer,
};
use std::fmt::{self, Debug, Formatter};
use thiserror::Error;
@@ -110,6 +114,39 @@ impl MixPacket {
.chain(self.packet.to_bytes()?)
.collect())
}
pub fn to_bytes(&self) -> Result<Vec<u8>, MixPacketFormattingError> {
Ok(std::iter::once(self.packet_type as u8)
.chain(self.next_hop.as_bytes())
.chain(self.packet.to_bytes()?)
.collect())
}
}
impl Serialize for MixPacket {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
serializer.serialize_bytes(&self.to_bytes().map_err(serde::ser::Error::custom)?)
}
}
struct MixPacketVisitor;
impl<'de> Visitor<'de> for MixPacketVisitor {
type Value = MixPacket;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a byte array representing a mix packet")
}
fn visit_bytes<E: de::Error>(self, v: &[u8]) -> Result<Self::Value, E> {
MixPacket::try_from_bytes(v).map_err(serde::de::Error::custom)
}
}
impl<'de> Deserialize<'de> for MixPacket {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
deserializer.deserialize_bytes(MixPacketVisitor)
}
}
// TODO: test for serialization and errors!
+66 -1
View File
@@ -1,7 +1,10 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::io;
use crate::message::{NymMessage, NymMessageError, PaddedMessage, PlainMessage};
use log::debug;
use nym_crypto::aes::cipher::{KeyIvInit, StreamCipher};
use nym_crypto::asymmetric::encryption;
use nym_crypto::shared_key::recompute_shared_key;
@@ -16,10 +19,13 @@ use nym_sphinx_params::{
PacketEncryptionAlgorithm, PacketHkdfAlgorithm, ReplySurbEncryptionAlgorithm,
DEFAULT_NUM_MIX_HOPS,
};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio_util::bytes::{Buf, BytesMut};
use tokio_util::codec::{Decoder, Encoder};
// TODO: should this live in this file?
#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub struct ReconstructedMessage {
/// The actual plaintext message that was received.
pub message: Vec<u8>,
@@ -57,6 +63,62 @@ impl From<PlainMessage> for ReconstructedMessage {
}
}
pub struct ReconstructedMessageCodec;
const LENGHT_ENCODING_PREFIX_SIZE: usize = 4;
impl Encoder<ReconstructedMessage> for ReconstructedMessageCodec {
type Error = MessageRecoveryError;
fn encode(
&mut self,
item: ReconstructedMessage,
buf: &mut BytesMut,
) -> Result<(), Self::Error> {
let encoded = bincode::serialize(&item).expect("failed to serialize ReconstructedMessage");
let encoded_len = encoded.len() as u32;
let mut encoded_with_len = encoded_len.to_le_bytes().to_vec();
encoded_with_len.extend(encoded);
buf.reserve(encoded_with_len.len());
buf.extend_from_slice(&encoded_with_len);
Ok(())
}
}
impl Decoder for ReconstructedMessageCodec {
type Item = ReconstructedMessage;
type Error = MessageRecoveryError;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if buf.len() < LENGHT_ENCODING_PREFIX_SIZE {
return Ok(None);
}
let len = u32::from_le_bytes(
buf[0..LENGHT_ENCODING_PREFIX_SIZE]
.try_into()
.expect("We know that we have at least LENGHT_ENCODING_PREFIX_SIZE bytes in there"),
) as usize;
if buf.len() < len + LENGHT_ENCODING_PREFIX_SIZE {
return Ok(None);
}
let decoded = match bincode::deserialize(
&buf[LENGHT_ENCODING_PREFIX_SIZE..len + LENGHT_ENCODING_PREFIX_SIZE],
) {
Ok(decoded) => decoded,
Err(e) => {
debug!("Failed to decode the message - {:?}", e);
return Ok(None);
}
};
buf.advance(len + LENGHT_ENCODING_PREFIX_SIZE);
Ok(Some(decoded))
}
}
#[derive(Debug, Error)]
pub enum MessageRecoveryError {
#[error("The received message did not contain enough bytes to recover the ephemeral public key. Got {provided}. required: {required}")]
@@ -74,6 +136,9 @@ pub enum MessageRecoveryError {
#[error("Failed to recover message fragment - {0}")]
FragmentRecoveryError(#[from] ChunkingError),
#[error("Failed to recover message fragment - {0}")]
MessageRecoveryError(#[from] io::Error),
}
pub trait MessageReceiver {
+1
View File
@@ -19,6 +19,7 @@ serde = { workspace = true, features = ["derive"] } # for config serialization/d
tap = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "net", "signal"] }
tokio-util = { workspace = true }
url = { workspace = true }
nym-bandwidth-controller = { path = "../../common/bandwidth-controller" }
@@ -7,6 +7,7 @@ use super::{SocksVersion, RESERVED, SOCKS4_VERSION, SOCKS5_VERSION};
use crate::config;
use futures::channel::mpsc;
use futures::task::{Context, Poll};
use futures::SinkExt;
use log::*;
use nym_client_core::client::inbound_messages::{InputMessage, InputMessageSender};
use nym_service_providers_common::interface::{ProviderInterfaceVersion, RequestVersion};
@@ -3,11 +3,12 @@
use crate::proxy_runner::MixProxySender;
use bytes::Bytes;
use futures::SinkExt;
use log::{debug, error};
use nym_socks5_requests::{ConnectionId, SocketData};
use std::io;
pub(crate) struct OrderedMessageSender<F, S> {
pub(crate) struct OrderedMessageSender<F, S: Send + 'static> {
connection_id: ConnectionId,
// addresses are provided for better logging
local_destination_address: String,
@@ -18,7 +19,7 @@ pub(crate) struct OrderedMessageSender<F, S> {
mix_message_adapter: F,
}
impl<F, S> OrderedMessageSender<F, S>
impl<F, S: Send + 'static> OrderedMessageSender<F, S>
where
F: Fn(SocketData) -> S,
{
@@ -55,7 +56,7 @@ where
(self.mix_message_adapter)(data)
}
async fn send_message(&self, message: S) {
async fn send_message(&mut self, message: S) {
if self.mixnet_sender.send(message).await.is_err() {
panic!("BatchRealMessageReceiver has stopped receiving!")
}
@@ -74,7 +74,7 @@ async fn wait_for_lane(
}
}
pub(super) async fn run_inbound<F, S>(
pub(super) async fn run_inbound<F, S: Send>(
mut reader: OwnedReadHalf,
mut message_sender: OrderedMessageSender<F, S>,
connection_id: ConnectionId,
@@ -9,6 +9,7 @@ use nym_task::TaskClient;
use std::fmt::Debug;
use std::{sync::Arc, time::Duration};
use tokio::{net::TcpStream, sync::Notify};
use tokio_util::sync::PollSender;
mod inbound;
mod outbound;
@@ -35,7 +36,7 @@ impl From<(Vec<u8>, bool)> for ProxyMessage {
}
}
pub type MixProxySender<S> = tokio::sync::mpsc::Sender<S>;
pub type MixProxySender<S> = PollSender<S>;
pub type MixProxyReader<S> = tokio::sync::mpsc::Receiver<S>;
// TODO: when we finally get to implementing graceful shutdown,
+1 -1
View File
@@ -19,7 +19,7 @@ pub trait StatisticsCollector {
interval: Duration,
timestamp: DateTime<Utc>,
) -> StatsMessage;
async fn send_stats_message(&self, stats_message: StatsMessage) -> Result<(), StatsError>;
async fn send_stats_message(&mut self, stats_message: StatsMessage) -> Result<(), StatsError>;
async fn reset_stats(&mut self);
}
+1
View File
@@ -12,6 +12,7 @@ futures = { workspace = true }
log = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["macros", "sync"] }
serde = { workspace = true }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio]
workspace = true
+2 -1
View File
@@ -2,11 +2,12 @@
// SPDX-License-Identifier: Apache-2.0
use futures::channel::mpsc;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
pub type ConnectionId = u64;
#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, Serialize, Deserialize)]
pub enum TransmissionLane {
General,
// we need to treat surb-related requests and responses at higher priority
+1 -1
View File
@@ -52,7 +52,7 @@ impl StatisticsCollector for GatewayStatisticsCollector {
}
}
async fn send_stats_message(&self, stats_message: StatsMessage) -> Result<(), StatsError> {
async fn send_stats_message(&mut self, stats_message: StatsMessage) -> Result<(), StatsError> {
build_and_send_statistics_request(stats_message, self.statistics_service_url.to_string())
.await
}
+9 -2
View File
@@ -9,7 +9,10 @@ license.workspace = true
[dependencies]
async-trait = { workspace = true }
bip39 = { workspace = true }
nym-client-core = { path = "../../../common/client-core", features = ["fs-surb-storage", "fs-gateways-storage"] }
nym-client-core = { path = "../../../common/client-core", features = [
"fs-surb-storage",
"fs-gateways-storage",
] }
nym-crypto = { path = "../../../common/crypto" }
nym-gateway-requests = { path = "../../../gateway/gateway-requests" }
nym-bandwidth-controller = { path = "../../../common/bandwidth-controller" }
@@ -21,7 +24,9 @@ nym-sphinx = { path = "../../../common/nymsphinx" }
nym-task = { path = "../../../common/task" }
nym-topology = { path = "../../../common/topology" }
nym-socks5-client-core = { path = "../../../common/socks5-client-core" }
nym-validator-client = { path = "../../../common/client-libs/validator-client", features = ["http-client"] }
nym-validator-client = { path = "../../../common/client-libs/validator-client", features = [
"http-client",
] }
nym-socks5-requests = { path = "../../../common/socks5/requests" }
nym-ordered-buffer = { path = "../../../common/socks5/ordered-buffer" }
nym-service-providers-common = { path = "../../../service-providers/common" }
@@ -37,6 +42,8 @@ tap = { workspace = true }
thiserror = { workspace = true }
url = { workspace = true }
toml = "0.5.10"
tokio = { workspace = true }
tokio-util = { workspace = true, features = ["codec"] }
[dev-dependencies]
anyhow = { workspace = true }
@@ -16,7 +16,7 @@ async fn main() {
let our_address = *client.nym_address();
println!("Our client nym address is: {our_address}");
let sender = client.split_sender();
let mut sender = client.split_sender();
// receiving task
let receiving_task_handle = tokio::spawn(async move {
+249 -4
View File
@@ -2,9 +2,11 @@ use crate::mixnet::client::MixnetClientBuilder;
use crate::mixnet::traits::MixnetMessageSender;
use crate::{Error, Result};
use async_trait::async_trait;
use futures::{ready, Stream, StreamExt};
use bytes::{Buf as _, BytesMut};
use futures::{ready, FutureExt, Sink, SinkExt, Stream, StreamExt};
use log::error;
use nym_client_core::client::base_client::GatewayConnection;
use nym_client_core::client::inbound_messages::InputMessageCodec;
use nym_client_core::client::{
base_client::{ClientInput, ClientOutput, ClientState},
inbound_messages::InputMessage,
@@ -12,15 +14,18 @@ use nym_client_core::client::{
};
use nym_crypto::asymmetric::identity;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::receiver::ReconstructedMessageCodec;
use nym_sphinx::{params::PacketType, receiver::ReconstructedMessage};
use nym_task::{
connections::{ConnectionCommandSender, LaneQueueLengths},
TaskHandle,
};
use nym_topology::NymTopology;
use std::pin::Pin;
use std::pin::{pin, Pin};
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_util::codec::{Encoder, FramedRead};
/// Client connected to the Nym mixnet.
pub struct MixnetClient {
@@ -51,6 +56,24 @@ pub struct MixnetClient {
// internal state used for the `Stream` implementation
_buffered: Vec<ReconstructedMessage>,
// internal state used for the `AsyncRead` implementation
_read: ReadBuffer,
}
#[derive(Debug, Default)]
struct ReadBuffer {
buffer: BytesMut,
}
impl ReadBuffer {
fn clear(&mut self) {
self.buffer.clear();
}
fn pending(&self) -> bool {
!self.buffer.is_empty()
}
}
impl MixnetClient {
@@ -75,6 +98,7 @@ impl MixnetClient {
task_handle,
packet_type,
_buffered: Vec::new(),
_read: ReadBuffer::default(),
}
}
@@ -190,6 +214,25 @@ impl MixnetClient {
// note: it's important to take ownership of the struct as if the shutdown is `TaskHandle::External`,
// it must be dropped to finalize the shutdown
}
fn read_buffer_to_slice(
&mut self,
buf: &mut ReadBuf,
cx: &mut Context<'_>,
) -> Poll<tokio::io::Result<()>> {
if self._read.buffer.len() < buf.capacity() {
// let written = self._read.buffer.len();
buf.put_slice(&self._read.buffer);
self._read.clear();
Poll::Ready(Ok(()))
} else {
let written = buf.capacity();
buf.put_slice(&self._read.buffer.split_off(written));
self._read.buffer.advance(written);
cx.waker().wake_by_ref();
Poll::Ready(Ok(()))
}
}
}
#[derive(Clone)]
@@ -198,6 +241,200 @@ pub struct MixnetClientSender {
packet_type: Option<PacketType>,
}
impl AsyncRead for MixnetClient {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf,
) -> Poll<tokio::io::Result<()>> {
let mut codec = ReconstructedMessageCodec {};
if self._read.pending() {
return self.read_buffer_to_slice(buf, cx);
}
let msg = match self.as_mut().poll_next(cx) {
Poll::Ready(Some(msg)) => msg,
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Pending => return Poll::Pending,
};
match codec.encode(msg, &mut self._read.buffer) {
Ok(_) => {}
Err(e) => {
error!("failed to encode reconstructed message: {:?}", e);
return Poll::Ready(Err(tokio::io::Error::new(
tokio::io::ErrorKind::Other,
"failed to encode reconstructed message",
)));
}
};
self.read_buffer_to_slice(buf, cx)
}
}
impl AsyncWrite for MixnetClient {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
let codec = InputMessageCodec {};
let mut reader = FramedRead::new(buf, codec);
let mut fut = reader.next();
let msg = match fut.poll_unpin(cx) {
Poll::Ready(Some(Ok(msg))) => msg,
Poll::Ready(Some(Err(_))) => {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"failed to read message from input",
)))
}
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(Ok(0)),
};
let msg_size = msg.serialized_size();
let mut fut = pin!(self.client_input.send(msg));
match fut.poll_unpin(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(msg_size as usize)),
Poll::Ready(Err(_)) => Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"failed to send message to mixnet",
))),
Poll::Pending => Poll::Pending,
}
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::prelude::v1::Result<(), std::io::Error>> {
Sink::poll_flush(self, cx)
.map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "failed to flush the sink"))
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::prelude::v1::Result<(), std::io::Error>> {
AsyncWrite::poll_flush(self, cx)
}
}
impl Sink<InputMessage> for MixnetClient {
type Error = Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
match self.sender().poll_ready_unpin(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(_)) => Poll::Ready(Err(Error::MessageSendingFailure)),
Poll::Pending => Poll::Pending,
}
}
fn start_send(mut self: Pin<&mut Self>, item: InputMessage) -> Result<()> {
self.sender()
.start_send_unpin(item)
.map_err(|_| Error::MessageSendingFailure)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.sender()
.poll_flush_unpin(cx)
.map_err(|_| Error::MessageSendingFailure)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.sender()
.poll_close_unpin(cx)
.map_err(|_| Error::MessageSendingFailure)
}
}
// TODO: there should be a better way of implementing Sink and AsyncWrite over T: MixnetMessageSender
impl AsyncWrite for MixnetClientSender {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
let codec = InputMessageCodec {};
let mut reader = FramedRead::new(buf, codec);
let mut fut = reader.next();
let msg = match fut.poll_unpin(cx) {
Poll::Ready(Some(Ok(msg))) => msg,
Poll::Ready(Some(Err(_))) => {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"failed to read message from input",
)))
}
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(Ok(0)),
};
let msg_size = msg.serialized_size();
let mut fut = pin!(self.client_input.send(msg));
match fut.poll_unpin(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(msg_size as usize)),
Poll::Ready(Err(_)) => Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"failed to send message to mixnet",
))),
Poll::Pending => Poll::Pending,
}
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::prelude::v1::Result<(), std::io::Error>> {
Sink::poll_flush(self, cx)
.map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "failed to flush the sink"))
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::prelude::v1::Result<(), std::io::Error>> {
AsyncWrite::poll_flush(self, cx)
}
}
impl Sink<InputMessage> for MixnetClientSender {
type Error = Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
match self.sender().poll_ready_unpin(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(_)) => Poll::Ready(Err(Error::MessageSendingFailure)),
Poll::Pending => Poll::Pending,
}
}
fn start_send(mut self: Pin<&mut Self>, item: InputMessage) -> Result<()> {
self.sender()
.start_send_unpin(item)
.map_err(|_| Error::MessageSendingFailure)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.sender()
.poll_flush_unpin(cx)
.map_err(|_| Error::MessageSendingFailure)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.sender()
.poll_close_unpin(cx)
.map_err(|_| Error::MessageSendingFailure)
}
}
impl Stream for MixnetClient {
type Item = ReconstructedMessage;
@@ -234,12 +471,16 @@ impl MixnetMessageSender for MixnetClient {
self.packet_type
}
async fn send(&self, message: InputMessage) -> Result<()> {
async fn send(&mut self, message: InputMessage) -> Result<()> {
self.client_input
.send(message)
.await
.map_err(|_| Error::MessageSendingFailure)
}
fn sender(&mut self) -> &mut tokio_util::sync::PollSender<InputMessage> {
&mut self.client_input.input_sender
}
}
#[async_trait]
@@ -248,10 +489,14 @@ impl MixnetMessageSender for MixnetClientSender {
self.packet_type
}
async fn send(&self, message: InputMessage) -> Result<()> {
async fn send(&mut self, message: InputMessage) -> Result<()> {
self.client_input
.send(message)
.await
.map_err(|_| Error::MessageSendingFailure)
}
fn sender(&mut self) -> &mut tokio_util::sync::PollSender<InputMessage> {
&mut self.client_input.input_sender
}
}
+6 -4
View File
@@ -16,9 +16,11 @@ pub trait MixnetMessageSender {
None
}
fn sender(&mut self) -> &mut tokio_util::sync::PollSender<InputMessage>;
/// Sends a [`InputMessage`] to the mixnet. This is the most low-level sending function, for
/// full customization.
async fn send(&self, message: InputMessage) -> Result<()>;
async fn send(&mut self, message: InputMessage) -> Result<()>;
/// Sends data to the supplied Nym address with the default surb behaviour.
///
@@ -35,7 +37,7 @@ pub trait MixnetMessageSender {
/// client.send_plain_message(recipient, "hi").await.unwrap();
/// }
/// ```
async fn send_plain_message<M>(&self, address: Recipient, message: M) -> Result<()>
async fn send_plain_message<M>(&mut self, address: Recipient, message: M) -> Result<()>
where
M: AsRef<[u8]> + Send,
{
@@ -61,7 +63,7 @@ pub trait MixnetMessageSender {
/// }
/// ```
async fn send_message<M>(
&self,
&mut self,
address: Recipient,
message: M,
surbs: IncludedSurbs,
@@ -103,7 +105,7 @@ pub trait MixnetMessageSender {
/// client.send_reply(tag, b"hi").await.unwrap();
/// }
/// ```
async fn send_reply<M>(&self, recipient_tag: AnonymousSenderTag, message: M) -> Result<()>
async fn send_reply<M>(&mut self, recipient_tag: AnonymousSenderTag, message: M) -> Result<()>
where
M: AsRef<[u8]> + Send,
{
@@ -605,7 +605,7 @@ impl MixnetListener {
// When an incoming mixnet message triggers a response that we send back, such as during
// connect handshake.
async fn handle_response(&self, response: IpPacketResponse) -> Result<()> {
async fn handle_response(&mut self, response: IpPacketResponse) -> Result<()> {
let Some(recipient) = response.recipient() else {
log::error!("No recipient in response packet, this should NOT happen!");
return Err(IpPacketRouterError::NoRecipientInResponse);
@@ -635,7 +635,7 @@ impl MixnetListener {
// A single incoming request can trigger multiple responses, such as when data requests contain
// multiple IP packets.
async fn handle_responses(&self, responses: Vec<PacketHandleResult>) {
async fn handle_responses(&mut self, responses: Vec<PacketHandleResult>) {
for response in responses {
match response {
Ok(Some(response)) => {
+12 -5
View File
@@ -20,7 +20,7 @@ anyhow = { workspace = true }
addr = { workspace = true }
async-trait = { workspace = true }
bs58 = { workspace = true }
clap = { workspace = true, features = ["cargo", "derive"]}
clap = { workspace = true, features = ["cargo", "derive"] }
dirs = "4.0"
futures = { workspace = true }
humantime-serde = { workspace = true }
@@ -33,19 +33,26 @@ regex = { workspace = true }
reqwest = { workspace = true, features = ["json"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "chrono"]}
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "chrono"] }
tap = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = [ "net", "rt-multi-thread", "macros" ] }
tokio = { workspace = true, features = ["net", "rt-multi-thread", "macros"] }
tokio-tungstenite = { workspace = true }
tokio-util = { workspace = true }
url = { workspace = true }
time = { workspace = true }
zeroize = { workspace = true }
# internal
nym-async-file-watcher = { path = "../../common/async-file-watcher" }
nym-bin-common = { path = "../../common/bin-common", features = ["output_format"] }
nym-client-core = { path = "../../common/client-core", features = ["cli", "fs-gateways-storage", "fs-surb-storage"] }
nym-bin-common = { path = "../../common/bin-common", features = [
"output_format",
] }
nym-client-core = { path = "../../common/client-core", features = [
"cli",
"fs-gateways-storage",
"fs-surb-storage",
] }
nym-client-websocket-requests = { path = "../../clients/native/websocket-requests" }
nym-config = { path = "../../common/config" }
nym-credentials = { path = "../../common/credentials" }
@@ -10,13 +10,14 @@ use crate::{reply, socks5};
use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::stream::StreamExt;
use futures::SinkExt;
use log::{debug, warn};
use nym_bin_common::bin_info_owned;
use nym_client_core::client::mix_traffic::transceiver::GatewayTransceiver;
use nym_client_core::config::disk_persistence::CommonClientPaths;
use nym_client_core::HardcodedTopologyProvider;
use nym_network_defaults::NymNetworkDetails;
use nym_sdk::mixnet::{MixnetMessageSender, TopologyProvider};
use nym_sdk::mixnet::TopologyProvider;
use nym_service_providers_common::interface::{
BinaryInformation, ProviderInterfaceVersion, Request, RequestVersion,
};
@@ -40,6 +41,7 @@ use nym_task::manager::TaskHandle;
use nym_task::TaskClient;
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio_util::sync::PollSender;
// Since it's an atomic, it's safe to be kept static and shared across threads
static ACTIVE_PROXIES: AtomicUsize = AtomicUsize::new(0);
@@ -284,6 +286,8 @@ impl NRServiceProviderBuilder {
// going to be used by `mixnet_response_listener`
let (mix_input_sender, mix_input_receiver) = tokio::sync::mpsc::channel::<MixnetMessage>(1);
let mix_input_sender = PollSender::new(mix_input_sender);
// Controller for managing all active connections.
let (mut active_connections_controller, controller_sender) = Controller::new(
mixnet_client.connection_command_sender(),
@@ -400,7 +404,7 @@ impl NRServiceProvider {
/// Listens for any messages from `mix_reader` that should be written back to the mix network
/// via the `websocket_writer`.
async fn mixnet_response_listener(
mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender,
mut mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender,
mut mix_input_reader: MixProxyReader<MixnetMessage>,
stats_collector: Option<ServiceStatisticsCollector>,
packet_type: PacketType,
@@ -425,7 +429,7 @@ impl NRServiceProvider {
}
let response_message = msg.into_input_message(packet_type);
mixnet_client_sender.send(response_message).await.unwrap();
nym_sdk::mixnet::MixnetMessageSender::send(&mut mixnet_client_sender, response_message).await.unwrap();
} else {
log::error!("Exiting: channel closed!");
break;
@@ -443,7 +447,7 @@ impl NRServiceProvider {
return_address: reply::MixnetAddress,
biggest_packet_size: PacketSize,
controller_sender: ControllerSender,
mix_input_sender: MixProxySender<MixnetMessage>,
mut mix_input_sender: MixProxySender<MixnetMessage>,
lane_queue_lengths: LaneQueueLengths,
mut shutdown: TaskClient,
) {
@@ -537,7 +541,7 @@ impl NRServiceProvider {
.unwrap_or(traffic_config.primary_packet_size);
let controller_sender_clone = self.controller_sender.clone();
let mix_input_sender_clone = self.mix_input_sender.clone();
let mut mix_input_sender_clone = self.mix_input_sender.clone();
let lane_queue_lengths_clone = self.mixnet_client.shared_lane_queue_lengths();
let mut shutdown = self.shutdown.get_handle();
@@ -5,6 +5,7 @@ use super::error::StatsError;
use crate::core::new_legacy_request_version;
use crate::reply::MixnetMessage;
use async_trait::async_trait;
use futures::SinkExt;
use log::*;
use nym_service_providers_common::interface::RequestVersion;
use nym_socks5_proxy_helpers::proxy_runner::MixProxySender;
@@ -165,7 +166,7 @@ impl StatisticsCollector for ServiceStatisticsCollector {
}
async fn send_stats_message(
&self,
&mut self,
stats_message: StatsMessage,
) -> Result<(), CommonStatsError> {
let msg = build_statistics_request_bytes(stats_message)?;
+7 -3
View File
@@ -1,6 +1,9 @@
[package]
name = "nym-client-wasm"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>", "Jedrzej Stuczynski <andrew@nymtech.net>"]
authors = [
"Dave Hrycyszyn <futurechimp@users.noreply.github.com>",
"Jedrzej Stuczynski <andrew@nymtech.net>",
]
version = "1.3.0-rc.0"
edition = "2021"
keywords = ["nym", "sphinx", "wasm", "webassembly", "privacy", "client"]
@@ -22,15 +25,16 @@ serde_json = { workspace = true }
serde-wasm-bindgen = { workspace = true }
wasm-bindgen = { workspace = true }
wasm-bindgen-futures = { workspace = true }
thiserror = { workspace = true }
thiserror = { workspace = true }
tsify = { workspace = true, features = ["js"] }
tokio = { workspace = true, default-features = false, features = ["sync"] }
nym-bin-common = { path = "../../common/bin-common" }
wasm-client-core = { path = "../../common/wasm/client-core" }
wasm-utils = { path = "../../common/wasm/utils" }
nym-node-tester-utils = { path = "../../common/node-tester-utils", optional = true }
nym-node-tester-wasm = { path = "../node-tester", optional = true}
nym-node-tester-wasm = { path = "../node-tester", optional = true }
[dev-dependencies]
wasm-bindgen-test = { workspace = true }
+3 -2
View File
@@ -14,6 +14,7 @@ use crate::response_pusher::ResponsePusher;
use js_sys::Promise;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::RwLock;
use tsify::Tsify;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::future_to_promise;
@@ -50,7 +51,7 @@ pub(crate) const NODE_TESTER_CLIENT_ID: &str = "_nym-node-tester-client";
#[wasm_bindgen]
pub struct NymClient {
self_address: String,
client_input: Arc<ClientInput>,
client_input: Arc<RwLock<ClientInput>>,
client_state: Arc<ClientState>,
// keep track of the "old" topology for the purposes of node tester
@@ -196,7 +197,7 @@ impl NymClientBuilder {
Ok(NymClient {
self_address,
client_input: Arc::new(client_input),
client_input: Arc::new(RwLock::new(client_input)),
client_state: Arc::new(started_client.client_state),
_full_topology: None,
// this cannot failed as we haven't passed an external task manager
+5 -1
View File
@@ -1,8 +1,10 @@
// Copyright 2022-2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use futures::SinkExt;
use js_sys::Promise;
use std::sync::Arc;
use tokio::sync::RwLock;
use wasm_bindgen::JsValue;
use wasm_bindgen_futures::future_to_promise;
use wasm_client_core::client::base_client::{ClientInput, ClientState};
@@ -48,10 +50,11 @@ pub(crate) trait InputSender {
fn send_messages(&self, messages: Vec<InputMessage>) -> Promise;
}
impl InputSender for Arc<ClientInput> {
impl InputSender for Arc<RwLock<ClientInput>> {
fn send_message(&self, message: InputMessage) -> Promise {
let this = Arc::clone(self);
future_to_promise(async move {
let mut this = this.write().await;
match this.input_sender.send(message).await {
Ok(_) => Ok(JsValue::null()),
Err(_) => Err(simple_js_error(
@@ -64,6 +67,7 @@ impl InputSender for Arc<ClientInput> {
fn send_messages(&self, messages: Vec<InputMessage>) -> Promise {
let this = Arc::clone(self);
future_to_promise(async move {
let mut this = this.write().await;
for message in messages {
if this.input_sender.send(message).await.is_err() {
return Err(simple_js_error(
+1 -1
View File
@@ -24,7 +24,7 @@ tokio = { workspace = true, features = ["sync"] }
url = { workspace = true }
wasm-bindgen = { workspace = true }
wasm-bindgen-futures = { workspace = true }
thiserror = { workspace = true }
thiserror = { workspace = true }
tsify = { workspace = true, features = ["js"] }
nym-bin-common = { path = "../../common/bin-common" }
+9 -2
View File
@@ -8,10 +8,13 @@ use crate::go_bridge::goWasmSetMixFetchRequestTimeout;
use crate::request_writer::RequestWriter;
use crate::socks_helpers::{socks5_connect_request, socks5_data_request};
use crate::{config, RequestId};
use futures::SinkExt;
use js_sys::Promise;
use nym_socks5_requests::RemoteAddress;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::future_to_promise;
use wasm_client_core::client::base_client::{BaseClientBuilder, ClientInput, ClientOutput};
@@ -34,7 +37,7 @@ pub struct MixFetchClient {
self_address: Recipient,
client_input: ClientInput,
client_input: Arc<RwLock<ClientInput>>,
requests: ActiveRequests,
@@ -131,7 +134,7 @@ impl MixFetchClientBuilder {
invalidated: AtomicBool::new(false),
mix_fetch_config: self.config.mix_fetch,
self_address,
client_input,
client_input: Arc::new(RwLock::new(client_input)),
requests: active_requests,
// this cannot failed as we haven't passed an external task manager
_task_manager: Mutex::new(started_client.task_handle.try_into_task_manager().unwrap()),
@@ -208,6 +211,8 @@ impl MixFetchClient {
// the expect here is fine as it implies an unrecoverable failure since one of the client core
// tasks has terminated
self.client_input
.write()
.await
.input_sender
.send(input)
.await
@@ -235,6 +240,8 @@ impl MixFetchClient {
// the expect here is fine as it implies an unrecoverable failure since one of the client core
// tasks has terminated
self.client_input
.write()
.await
.input_sender
.send(input)
.await