Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 23d648b00f | |||
| a9df5da39c | |||
| 15ea11d5fd | |||
| 87361e076c | |||
| 935655cfb6 | |||
| 9bda144c99 | |||
| 77f14a3703 |
Generated
+1
@@ -4474,6 +4474,7 @@ dependencies = [
|
||||
name = "nym-sdk"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bip39",
|
||||
"dotenvy",
|
||||
"futures",
|
||||
|
||||
@@ -22,13 +22,19 @@ In the future the SDK will be made up of several components, each of which will
|
||||
| Coconut | Create & verify Coconut credentials | 🛠️ |
|
||||
| Validator | Sign & broadcast Nyx blockchain transactions, query the blockchain | ❌ |
|
||||
|
||||
The `mixnet` component currently exposes the logic of two clients: the websocket client, and the socks client.
|
||||
The `mixnet` component currently exposes the logic of two clients: the [websocket client](../clients/websocket-client.md), and the [socks](../clients/socks5-client.md) client.
|
||||
|
||||
The `coconut` component is currently being worked on. Right now it exposes logic allowing for the creation of coconut credentials on the Sandbox testnet.
|
||||
|
||||
## Websocket client examples
|
||||
> All the codeblocks below can be found in the `nym-sdk` [examples directory](https://github.com/nymtech/nym/tree/release/{{platform_release_version}}/sdk/rust/nym-sdk/examples) in the monorepo. Just navigate to `nym/sdk/rust/nym-sdk/examples/` and run the files from there. If you wish to run these outside of the workspace - such as if you want to use one as the basis for your own project - then make sure to import the `sdk`, `tokio`, and `nym_bin_common` crates.
|
||||
|
||||
### Different message types
|
||||
There are two methods for sending messages through the mixnet using your client:
|
||||
* `send_plain_message()` is the most simple: pass the recipient address and the message you wish to send as a string (this was previously `send_str()`). This is a nicer-to-use wrapper around `send_message()`.
|
||||
* `send_message()` allows you to also define the amount of SURBs to send along with your message (which is sent as bytes).
|
||||
|
||||
### Simple example
|
||||
Lets look at a very simple example of how you can import and use the websocket client in a piece of Rust code (`examples/simple.rs`):
|
||||
|
||||
```rust,noplayground
|
||||
@@ -54,7 +60,7 @@ If you're integrating mixnet functionality into an existing app and want to inte
|
||||
```
|
||||
|
||||
### Anonymous replies with SURBs
|
||||
Both functions used to send messages through the mixnet (`send_str` and `send_bytes`) send a pre-determined number of SURBs along with their messages by default.
|
||||
Both functions used to send messages through the mixnet (`send_message` and `send_plain_message`) send a pre-determined number of SURBs along with their messages by default.
|
||||
|
||||
The number of SURBs is set [here](https://github.com/nymtech/nym/blob/release/{{platform_release_version}}/sdk/rust/nym-sdk/src/mixnet/client.rs#L34):
|
||||
|
||||
@@ -89,6 +95,13 @@ If you aren't running a Validator and Nym API, and just want to import a specifi
|
||||
{{#include ../../../../sdk/rust/nym-sdk/examples/manually_overwrite_topology.rs}}
|
||||
```
|
||||
|
||||
### Send and receive in different tasks
|
||||
If you need to split the different actions of your client across different tasks, you can do so like this:
|
||||
|
||||
```rust, noplayground
|
||||
{{#include ../../../../sdk/rust/nym-sdk/examples/parallel_sending_and_receiving.rs}}
|
||||
```
|
||||
|
||||
## Socks client example
|
||||
There is also the option to embed the [`socks5-client`](../clients/socks5-client.md) into your app code (`examples/socks5.rs`):
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ edition = "2021"
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
async-trait = { workspace = true }
|
||||
bip39 = { workspace = true }
|
||||
nym-client-core = { path = "../../../common/client-core", features = ["fs-surb-storage"]}
|
||||
nym-crypto = { path = "../../../common/crypto" }
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use nym_sdk::mixnet;
|
||||
use nym_sdk::mixnet::MixnetMessageSender;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
@@ -29,7 +30,10 @@ async fn main() {
|
||||
let our_address = client.nym_address();
|
||||
|
||||
// Send a message throughout the mixnet to ourselves
|
||||
client.send_str(*our_address, "hello there").await;
|
||||
client
|
||||
.send_plain_message(*our_address, "hello there")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
println!("Waiting for message");
|
||||
if let Some(received) = client.wait_for_messages().await {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use nym_sdk::mixnet;
|
||||
use nym_sdk::mixnet::MixnetMessageSender;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
@@ -19,7 +20,10 @@ async fn main() {
|
||||
println!("Our client nym address is: {our_address}");
|
||||
|
||||
// Send a message through the mixnet to ourselves
|
||||
client.send_str(*our_address, "hello there").await;
|
||||
client
|
||||
.send_plain_message(*our_address, "hello there")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
println!("Waiting for message");
|
||||
if let Some(received) = client.wait_for_messages().await {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use nym_sdk::mixnet;
|
||||
use nym_sdk::mixnet::MixnetMessageSender;
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[tokio::main]
|
||||
@@ -26,7 +27,10 @@ async fn main() {
|
||||
println!("Our client nym address is: {our_address}");
|
||||
|
||||
// Send a message throught the mixnet to ourselves
|
||||
client.send_str(*our_address, "hello there").await;
|
||||
client
|
||||
.send_plain_message(*our_address, "hello there")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
println!("Waiting for message");
|
||||
if let Some(received) = client.wait_for_messages().await {
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use nym_sdk::mixnet;
|
||||
use nym_sdk::mixnet::MixnetMessageSender;
|
||||
use nym_topology::provider_trait::{async_trait, TopologyProvider};
|
||||
use nym_topology::{nym_topology_from_detailed, NymTopology};
|
||||
use url::Url;
|
||||
@@ -69,7 +70,10 @@ async fn main() {
|
||||
println!("Our client nym address is: {our_address}");
|
||||
|
||||
// Send a message through the mixnet to ourselves
|
||||
client.send_str(*our_address, "hello there").await;
|
||||
client
|
||||
.send_plain_message(*our_address, "hello there")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
println!("Waiting for message (ctrl-c to exit)");
|
||||
client
|
||||
|
||||
@@ -3,6 +3,7 @@ use nym_client_core::client::base_client::storage::gateway_details::{
|
||||
};
|
||||
use nym_sdk::mixnet::{
|
||||
self, EmptyReplyStorage, EphemeralCredentialStorage, KeyManager, KeyStore, MixnetClientStorage,
|
||||
MixnetMessageSender,
|
||||
};
|
||||
use nym_topology::provider_trait::async_trait;
|
||||
|
||||
@@ -26,7 +27,10 @@ async fn main() {
|
||||
println!("Our client nym address is: {our_address}");
|
||||
|
||||
// Send important info up the pipe to a buddy
|
||||
client.send_str(*our_address, "hello there").await;
|
||||
client
|
||||
.send_plain_message(*our_address, "hello there")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
println!("Waiting for message");
|
||||
if let Some(received) = client.wait_for_messages().await {
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use nym_sdk::mixnet;
|
||||
use nym_sdk::mixnet::MixnetMessageSender;
|
||||
use nym_topology::mix::Layer;
|
||||
use nym_topology::{mix, NymTopology};
|
||||
use std::collections::BTreeMap;
|
||||
@@ -81,7 +82,10 @@ async fn main() {
|
||||
println!("Our client nym address is: {our_address}");
|
||||
|
||||
// Send a message through the mixnet to ourselves
|
||||
client.send_str(*our_address, "hello there").await;
|
||||
client
|
||||
.send_plain_message(*our_address, "hello there")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
println!("Waiting for message (ctrl-c to exit)");
|
||||
client
|
||||
|
||||
@@ -0,0 +1,42 @@
|
||||
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use futures::StreamExt;
|
||||
use nym_sdk::mixnet;
|
||||
use nym_sdk::mixnet::MixnetMessageSender;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
nym_bin_common::logging::setup_logging();
|
||||
|
||||
// Passing no config makes the client fire up an ephemeral session and figure stuff out on its own
|
||||
let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
|
||||
|
||||
// Be able to get our client address
|
||||
let our_address = *client.nym_address();
|
||||
println!("Our client nym address is: {our_address}");
|
||||
|
||||
let sender = client.split_sender();
|
||||
|
||||
// receiving task
|
||||
let receiving_task_handle = tokio::spawn(async move {
|
||||
if let Some(received) = client.next().await {
|
||||
println!("Received: {}", String::from_utf8_lossy(&received.message));
|
||||
}
|
||||
|
||||
client.disconnect().await;
|
||||
});
|
||||
|
||||
// sending task
|
||||
let sending_task_handle = tokio::spawn(async move {
|
||||
sender
|
||||
.send_plain_message(our_address, "hello from a different task!")
|
||||
.await
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
// wait for both tasks to be done
|
||||
println!("waiting for shutdown");
|
||||
sending_task_handle.await.unwrap();
|
||||
receiving_task_handle.await.unwrap();
|
||||
}
|
||||
@@ -1,4 +1,5 @@
|
||||
use nym_sdk::mixnet;
|
||||
use nym_sdk::mixnet::MixnetMessageSender;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
@@ -12,7 +13,10 @@ async fn main() {
|
||||
println!("Our client nym address is: {our_address}");
|
||||
|
||||
// Send a message throught the mixnet to ourselves
|
||||
client.send_str(*our_address, "hello there").await;
|
||||
client
|
||||
.send_plain_message(*our_address, "hello there")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
println!("Waiting for message (ctrl-c to exit)");
|
||||
client
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use nym_sdk::mixnet::{
|
||||
AnonymousSenderTag, MixnetClientBuilder, ReconstructedMessage, StoragePaths,
|
||||
AnonymousSenderTag, MixnetClientBuilder, MixnetMessageSender, ReconstructedMessage,
|
||||
StoragePaths,
|
||||
};
|
||||
use std::path::PathBuf;
|
||||
|
||||
@@ -28,7 +29,10 @@ async fn main() {
|
||||
println!("\nOur client nym address is: {our_address}");
|
||||
|
||||
// Send a message through the mixnet to ourselves using our nym address
|
||||
client.send_str(*our_address, "hello there").await;
|
||||
client
|
||||
.send_plain_message(*our_address, "hello there")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// we're going to parse the sender_tag (AnonymousSenderTag) from the incoming message and use it to 'reply' to ourselves instead of our Nym address.
|
||||
// we know there will be a sender_tag since the sdk sends SURBs along with messages by default.
|
||||
@@ -57,7 +61,10 @@ async fn main() {
|
||||
|
||||
// reply to self with it: note we use `send_str_reply` instead of `send_str`
|
||||
println!("Replying with using SURBs");
|
||||
client.send_str_reply(return_recipient, "hi an0n!").await;
|
||||
client
|
||||
.send_reply(return_recipient, "hi an0n!")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
println!("Waiting for message (once you see it, ctrl-c to exit)\n");
|
||||
client
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
//! # Basic example
|
||||
//!
|
||||
//! ```no_run
|
||||
//! use nym_sdk::mixnet;
|
||||
//! use nym_sdk::mixnet::{self, MixnetMessageSender};
|
||||
//!
|
||||
//! #[tokio::main]
|
||||
//! async fn main() {
|
||||
@@ -27,7 +27,7 @@
|
||||
//! let our_address = client.nym_address();
|
||||
//!
|
||||
//! // Send a message throughout the mixnet to ourselves
|
||||
//! client.send_str(*our_address, "hello there").await;
|
||||
//! client.send_plain_message(*our_address, "hello there").await.unwrap();
|
||||
//!
|
||||
//! println!("Waiting for message");
|
||||
//! if let Some(received) = client.wait_for_messages().await {
|
||||
|
||||
@@ -90,6 +90,9 @@ pub enum Error {
|
||||
|
||||
#[error("loaded shared gateway key without providing information about what gateway it corresponds to")]
|
||||
GatewayWithUnknownEndpoint,
|
||||
|
||||
#[error("failed to send the provided message")]
|
||||
MessageSendingFailure,
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
@@ -4,20 +4,20 @@
|
||||
//! # Basic example
|
||||
//!
|
||||
//! ```no_run
|
||||
//! use nym_sdk::mixnet;
|
||||
//! use nym_sdk::mixnet::{self, MixnetMessageSender};
|
||||
//!
|
||||
//! #[tokio::main]
|
||||
//! async fn main() {
|
||||
//! // Passing no config makes the client fire up an ephemeral session and figure stuff out on
|
||||
//! // its own
|
||||
//! let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
|
||||
//! let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
|
||||
//!
|
||||
//! // Be able to get our client address
|
||||
//! let our_address = client.nym_address();
|
||||
//! println!("Our client nym address is: {our_address}");
|
||||
//!
|
||||
//! // Send a message throught the mixnet to ourselves
|
||||
//! client.send_str(*our_address, "hello there").await;
|
||||
//! client.send_plain_message(*our_address, "hello there").await.unwrap();
|
||||
//!
|
||||
//! println!("Waiting for message");
|
||||
//! if let Some(received) = client.wait_for_messages().await {
|
||||
@@ -36,6 +36,7 @@ mod connection_state;
|
||||
mod native_client;
|
||||
mod paths;
|
||||
mod socks5_client;
|
||||
mod traits;
|
||||
|
||||
pub use client::{DisconnectedMixnetClient, IncludedSurbs, MixnetClientBuilder};
|
||||
pub use config::{Config, KeyMode};
|
||||
@@ -73,3 +74,4 @@ pub use nym_sphinx::{
|
||||
pub use nym_topology::{provider_trait::TopologyProvider, NymTopology};
|
||||
pub use paths::StoragePaths;
|
||||
pub use socks5_client::Socks5MixnetClient;
|
||||
pub use traits::MixnetMessageSender;
|
||||
|
||||
@@ -497,15 +497,15 @@ where
|
||||
|
||||
let reconstructed_receiver = client_output.register_receiver()?;
|
||||
|
||||
Ok(MixnetClient {
|
||||
Ok(MixnetClient::new(
|
||||
nym_address,
|
||||
client_input,
|
||||
client_output,
|
||||
client_state,
|
||||
reconstructed_receiver,
|
||||
task_manager: started_client.task_manager,
|
||||
packet_type: None,
|
||||
})
|
||||
started_client.task_manager,
|
||||
None,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,3 +1,9 @@
|
||||
use crate::mixnet::client::MixnetClientBuilder;
|
||||
use crate::mixnet::traits::MixnetMessageSender;
|
||||
use crate::{Error, Result};
|
||||
use async_trait::async_trait;
|
||||
use futures::{ready, Stream, StreamExt};
|
||||
use log::error;
|
||||
use nym_client_core::client::{
|
||||
base_client::{ClientInput, ClientOutput, ClientState},
|
||||
inbound_messages::InputMessage,
|
||||
@@ -6,16 +12,12 @@ use nym_client_core::client::{
|
||||
use nym_sphinx::addressing::clients::Recipient;
|
||||
use nym_sphinx::{params::PacketType, receiver::ReconstructedMessage};
|
||||
use nym_task::{
|
||||
connections::{ConnectionCommandSender, LaneQueueLengths, TransmissionLane},
|
||||
connections::{ConnectionCommandSender, LaneQueueLengths},
|
||||
TaskManager,
|
||||
};
|
||||
|
||||
use futures::StreamExt;
|
||||
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
|
||||
use nym_topology::NymTopology;
|
||||
|
||||
use crate::mixnet::client::{IncludedSurbs, MixnetClientBuilder};
|
||||
use crate::Result;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
/// Client connected to the Nym mixnet.
|
||||
pub struct MixnetClient {
|
||||
@@ -41,9 +43,33 @@ pub struct MixnetClient {
|
||||
/// The task manager that controlls all the spawned tasks that the clients uses to do it's job.
|
||||
pub(crate) task_manager: TaskManager,
|
||||
pub(crate) packet_type: Option<PacketType>,
|
||||
|
||||
// internal state used for the `Stream` implementation
|
||||
_buffered: Vec<ReconstructedMessage>,
|
||||
}
|
||||
|
||||
impl MixnetClient {
|
||||
pub(crate) fn new(
|
||||
nym_address: Recipient,
|
||||
client_input: ClientInput,
|
||||
client_output: ClientOutput,
|
||||
client_state: ClientState,
|
||||
reconstructed_receiver: ReconstructedMessagesReceiver,
|
||||
task_manager: TaskManager,
|
||||
packet_type: Option<PacketType>,
|
||||
) -> Self {
|
||||
Self {
|
||||
nym_address,
|
||||
client_input,
|
||||
client_output,
|
||||
client_state,
|
||||
reconstructed_receiver,
|
||||
task_manager,
|
||||
packet_type,
|
||||
_buffered: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new client and connect to the mixnet using ephemeral in-memory keys that are
|
||||
/// discarded at application close.
|
||||
///
|
||||
@@ -74,9 +100,10 @@ impl MixnetClient {
|
||||
|
||||
/// Get a shallow clone of [`MixnetClientSender`]. Useful if you want split the send and
|
||||
/// receive logic in different locations.
|
||||
pub fn sender(&self) -> MixnetClientSender {
|
||||
pub fn split_sender(&self) -> MixnetClientSender {
|
||||
MixnetClientSender {
|
||||
client_input: self.client_input.clone(),
|
||||
packet_type: self.packet_type,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,132 +139,6 @@ impl MixnetClient {
|
||||
self.client_state.topology_accessor.release_manual_control()
|
||||
}
|
||||
|
||||
/// Sends stringy data to the supplied Nym address
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```no_run
|
||||
/// use nym_sdk::mixnet;
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() {
|
||||
/// let address = "foobar";
|
||||
/// let recipient = mixnet::Recipient::try_from_base58_string(address).unwrap();
|
||||
/// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
|
||||
/// client.send_str(recipient, "hi").await;
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn send_str(&self, address: Recipient, message: &str) {
|
||||
self.send_bytes(address, message, IncludedSurbs::default())
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Sends bytes to the supplied Nym address. There is the option to specify the number of
|
||||
/// reply-SURBs to include.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```no_run
|
||||
/// use nym_sdk::mixnet;
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() {
|
||||
/// let address = "foobar";
|
||||
/// let recipient = mixnet::Recipient::try_from_base58_string(address).unwrap();
|
||||
/// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
|
||||
/// let surbs = mixnet::IncludedSurbs::default();
|
||||
/// client.send_bytes(recipient, "hi".to_owned().into_bytes(), surbs).await;
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn send_bytes<M: AsRef<[u8]>>(
|
||||
&self,
|
||||
address: Recipient,
|
||||
message: M,
|
||||
surbs: IncludedSurbs,
|
||||
) {
|
||||
let lane = TransmissionLane::General;
|
||||
let input_msg = match surbs {
|
||||
IncludedSurbs::Amount(surbs) => InputMessage::new_anonymous(
|
||||
address,
|
||||
message.as_ref().to_vec(),
|
||||
surbs,
|
||||
lane,
|
||||
self.packet_type,
|
||||
),
|
||||
IncludedSurbs::ExposeSelfAddress => InputMessage::new_regular(
|
||||
address,
|
||||
message.as_ref().to_vec(),
|
||||
lane,
|
||||
self.packet_type,
|
||||
),
|
||||
};
|
||||
self.send(input_msg).await
|
||||
}
|
||||
|
||||
/// Sends stringy reply data to the supplied anonymous recipient.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```no_run
|
||||
/// use nym_sdk::mixnet;
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() {
|
||||
/// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
|
||||
/// // note: the tag is something you would have received from a remote client sending you surbs!
|
||||
/// let tag = mixnet::AnonymousSenderTag::try_from_base58_string("foobar").unwrap();
|
||||
/// client.send_str_reply(tag, "hi").await;
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn send_str_reply(&self, recipient_tag: AnonymousSenderTag, message: &str) {
|
||||
self.send_reply(recipient_tag, message).await;
|
||||
}
|
||||
|
||||
/// Sends binary reply data to the supplied anonymous recipient.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```no_run
|
||||
/// use nym_sdk::mixnet;
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() {
|
||||
/// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
|
||||
/// // note: the tag is something you would have received from a remote client sending you surbs!
|
||||
/// let tag = mixnet::AnonymousSenderTag::try_from_base58_string("foobar").unwrap();
|
||||
/// client.send_reply(tag, b"hi").await;
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn send_reply<M: AsRef<[u8]>>(&self, recipient_tag: AnonymousSenderTag, message: M) {
|
||||
let lane = TransmissionLane::General;
|
||||
let input_msg = InputMessage::new_reply(
|
||||
recipient_tag,
|
||||
message.as_ref().to_vec(),
|
||||
lane,
|
||||
self.packet_type,
|
||||
);
|
||||
self.send(input_msg).await
|
||||
}
|
||||
|
||||
/// Sends a [`InputMessage`] to the mixnet. This is the most low-level sending function, for
|
||||
/// full customization.
|
||||
async fn send(&self, message: InputMessage) {
|
||||
if self.client_input.send(message).await.is_err() {
|
||||
log::error!("Failed to send message");
|
||||
}
|
||||
}
|
||||
|
||||
/// Sends a [`InputMessage`] to the mixnet. This is the most low-level sending function, for
|
||||
/// full customization.
|
||||
///
|
||||
/// Waits until the message is actually sent, or close to being sent, until returning.
|
||||
///
|
||||
/// NOTE: this not yet implemented.
|
||||
#[allow(unused)]
|
||||
async fn send_wait(&self, _message: InputMessage) {
|
||||
todo!();
|
||||
}
|
||||
|
||||
/// Wait for messages from the mixnet
|
||||
pub async fn wait_for_messages(&mut self) -> Option<Vec<ReconstructedMessage>> {
|
||||
self.reconstructed_receiver.next().await
|
||||
@@ -265,12 +166,63 @@ impl MixnetClient {
|
||||
|
||||
pub struct MixnetClientSender {
|
||||
client_input: ClientInput,
|
||||
packet_type: Option<PacketType>,
|
||||
}
|
||||
|
||||
impl MixnetClientSender {
|
||||
pub async fn send_input_message(&mut self, message: InputMessage) {
|
||||
if self.client_input.send(message).await.is_err() {
|
||||
log::error!("Failed to send message");
|
||||
impl Stream for MixnetClient {
|
||||
type Item = ReconstructedMessage;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
if let Some(next) = self._buffered.pop() {
|
||||
cx.waker().wake_by_ref();
|
||||
return Poll::Ready(Some(next));
|
||||
}
|
||||
match ready!(Pin::new(&mut self.reconstructed_receiver).poll_next(cx)) {
|
||||
None => Poll::Ready(None),
|
||||
Some(mut msgs) => {
|
||||
// the vector itself should never be empty
|
||||
if let Some(next) = msgs.pop() {
|
||||
// there's more than a single message - buffer them and wake the waker
|
||||
// to get polled again immediately
|
||||
if !msgs.is_empty() {
|
||||
self._buffered = msgs;
|
||||
cx.waker().wake_by_ref();
|
||||
}
|
||||
Poll::Ready(Some(next))
|
||||
} else {
|
||||
error!("the reconstructed messages vector is empty - please let the developers know if you see this message");
|
||||
cx.waker().wake_by_ref();
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl MixnetMessageSender for MixnetClient {
|
||||
fn packet_type(&self) -> Option<PacketType> {
|
||||
self.packet_type
|
||||
}
|
||||
|
||||
async fn send(&self, message: InputMessage) -> Result<()> {
|
||||
self.client_input
|
||||
.send(message)
|
||||
.await
|
||||
.map_err(|_| Error::MessageSendingFailure)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl MixnetMessageSender for MixnetClientSender {
|
||||
fn packet_type(&self) -> Option<PacketType> {
|
||||
self.packet_type
|
||||
}
|
||||
|
||||
async fn send(&self, message: InputMessage) -> Result<()> {
|
||||
self.client_input
|
||||
.send(message)
|
||||
.await
|
||||
.map_err(|_| Error::MessageSendingFailure)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,119 @@
|
||||
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::mixnet::{AnonymousSenderTag, IncludedSurbs, Recipient};
|
||||
use crate::Result;
|
||||
use async_trait::async_trait;
|
||||
use nym_client_core::client::inbound_messages::InputMessage;
|
||||
use nym_sphinx::params::PacketType;
|
||||
use nym_task::connections::TransmissionLane;
|
||||
|
||||
// defined to guarantee common interface regardless of whether you're using the full client
|
||||
// or just the sending handler
|
||||
#[async_trait]
|
||||
pub trait MixnetMessageSender {
|
||||
fn packet_type(&self) -> Option<PacketType> {
|
||||
None
|
||||
}
|
||||
|
||||
/// Sends a [`InputMessage`] to the mixnet. This is the most low-level sending function, for
|
||||
/// full customization.
|
||||
async fn send(&self, message: InputMessage) -> Result<()>;
|
||||
|
||||
/// Sends data to the supplied Nym address with the default surb behaviour.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```no_run
|
||||
/// use nym_sdk::mixnet::{self, MixnetMessageSender};
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() {
|
||||
/// let address = "foobar";
|
||||
/// let recipient = mixnet::Recipient::try_from_base58_string(address).unwrap();
|
||||
/// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
|
||||
/// client.send_plain_message(recipient, "hi").await.unwrap();
|
||||
/// }
|
||||
/// ```
|
||||
async fn send_plain_message<M>(&self, address: Recipient, message: M) -> Result<()>
|
||||
where
|
||||
M: AsRef<[u8]> + Send,
|
||||
{
|
||||
self.send_message(address, message, IncludedSurbs::default())
|
||||
.await
|
||||
}
|
||||
|
||||
/// Sends bytes to the supplied Nym address. There is the option to specify the number of
|
||||
/// reply-SURBs to include.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```no_run
|
||||
/// use nym_sdk::mixnet::{self, MixnetMessageSender};
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() {
|
||||
/// let address = "foobar";
|
||||
/// let recipient = mixnet::Recipient::try_from_base58_string(address).unwrap();
|
||||
/// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
|
||||
/// let surbs = mixnet::IncludedSurbs::default();
|
||||
/// client.send_message(recipient, "hi".to_owned().into_bytes(), surbs).await.unwrap();
|
||||
/// }
|
||||
/// ```
|
||||
async fn send_message<M>(
|
||||
&self,
|
||||
address: Recipient,
|
||||
message: M,
|
||||
surbs: IncludedSurbs,
|
||||
) -> Result<()>
|
||||
where
|
||||
M: AsRef<[u8]> + Send,
|
||||
{
|
||||
let lane = TransmissionLane::General;
|
||||
let input_msg = match surbs {
|
||||
IncludedSurbs::Amount(surbs) => InputMessage::new_anonymous(
|
||||
address,
|
||||
message.as_ref().to_vec(),
|
||||
surbs,
|
||||
lane,
|
||||
self.packet_type(),
|
||||
),
|
||||
IncludedSurbs::ExposeSelfAddress => InputMessage::new_regular(
|
||||
address,
|
||||
message.as_ref().to_vec(),
|
||||
lane,
|
||||
self.packet_type(),
|
||||
),
|
||||
};
|
||||
self.send(input_msg).await
|
||||
}
|
||||
|
||||
/// Sends reply data to the supplied anonymous recipient.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```no_run
|
||||
/// use nym_sdk::mixnet::{self, MixnetMessageSender};
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() {
|
||||
/// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
|
||||
/// // note: the tag is something you would have received from a remote client sending you surbs!
|
||||
/// let tag = mixnet::AnonymousSenderTag::try_from_base58_string("foobar").unwrap();
|
||||
/// client.send_reply(tag, b"hi").await.unwrap();
|
||||
/// }
|
||||
/// ```
|
||||
async fn send_reply<M>(&self, recipient_tag: AnonymousSenderTag, message: M) -> Result<()>
|
||||
where
|
||||
M: AsRef<[u8]> + Send,
|
||||
{
|
||||
let lane = TransmissionLane::General;
|
||||
let input_msg = InputMessage::new_reply(
|
||||
recipient_tag,
|
||||
message.as_ref().to_vec(),
|
||||
lane,
|
||||
self.packet_type(),
|
||||
);
|
||||
self.send(input_msg).await
|
||||
}
|
||||
}
|
||||
@@ -3,7 +3,9 @@
|
||||
|
||||
// use nym_client::client::config::{BaseClientConfig, Config, GatewayEndpointConfig};
|
||||
// use nym_client::client::{DirectClient, KeyManager, Recipient, ReconstructedMessage, SocketClient};
|
||||
use nym_sdk::mixnet::{IncludedSurbs, MixnetClient, Recipient, ReconstructedMessage};
|
||||
use nym_sdk::mixnet::{
|
||||
IncludedSurbs, MixnetClient, MixnetMessageSender, Recipient, ReconstructedMessage,
|
||||
};
|
||||
use nym_service_providers_common::interface::{
|
||||
ControlRequest, ControlResponse, ProviderInterfaceVersion, Request, Response, ResponseContent,
|
||||
};
|
||||
@@ -51,34 +53,34 @@ async fn main() -> anyhow::Result<()> {
|
||||
// TODO: currently we HAVE TO use surbs unfortunately
|
||||
println!("Sending 'Health' request...");
|
||||
client
|
||||
.send_bytes(
|
||||
.send_message(
|
||||
provider,
|
||||
full_request_health.into_bytes(),
|
||||
IncludedSurbs::new(10),
|
||||
)
|
||||
.await;
|
||||
.await?;
|
||||
let response = wait_for_control_response(&mut client).await;
|
||||
println!("response to 'Health' request: {response:#?}");
|
||||
|
||||
println!("Sending 'BinaryInfo' request...");
|
||||
client
|
||||
.send_bytes(
|
||||
.send_message(
|
||||
provider,
|
||||
full_request_binary_info.into_bytes(),
|
||||
IncludedSurbs::none(),
|
||||
)
|
||||
.await;
|
||||
.await?;
|
||||
let response = wait_for_control_response(&mut client).await;
|
||||
println!("response to 'BinaryInfo' request: {response:#?}");
|
||||
|
||||
println!("Sending 'SupportedRequestVersions' request...");
|
||||
client
|
||||
.send_bytes(
|
||||
.send_message(
|
||||
provider,
|
||||
full_request_versions.into_bytes(),
|
||||
IncludedSurbs::none(),
|
||||
)
|
||||
.await;
|
||||
.await?;
|
||||
let response = wait_for_control_response(&mut client).await;
|
||||
println!("response to 'SupportedRequestVersions' request: {response:#?}");
|
||||
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use nym_sdk::mixnet::{IncludedSurbs, MixnetClient, Recipient, ReconstructedMessage};
|
||||
use nym_sdk::mixnet::{
|
||||
IncludedSurbs, MixnetClient, MixnetMessageSender, Recipient, ReconstructedMessage,
|
||||
};
|
||||
use nym_service_providers_common::interface::{
|
||||
ProviderInterfaceVersion, Request, Response, ResponseContent,
|
||||
};
|
||||
@@ -47,23 +49,23 @@ async fn main() -> anyhow::Result<()> {
|
||||
);
|
||||
println!("Sending 'OpenProxy' query...");
|
||||
client
|
||||
.send_bytes(
|
||||
.send_message(
|
||||
provider,
|
||||
open_proxy_request.into_bytes(),
|
||||
IncludedSurbs::new(10),
|
||||
)
|
||||
.await;
|
||||
.await?;
|
||||
let response = wait_for_response(&mut client).await;
|
||||
println!("response to 'OpenProxy' query: {response:#?}");
|
||||
|
||||
println!("Sending 'Description' query...");
|
||||
client
|
||||
.send_bytes(
|
||||
.send_message(
|
||||
provider,
|
||||
description_request.into_bytes(),
|
||||
IncludedSurbs::none(),
|
||||
)
|
||||
.await;
|
||||
.await?;
|
||||
let response = wait_for_response(&mut client).await;
|
||||
println!("response to 'Description' query: {response:#?}");
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ use log::warn;
|
||||
use nym_bin_common::bin_info_owned;
|
||||
use nym_client_core::config::disk_persistence::CommonClientPaths;
|
||||
use nym_network_defaults::NymNetworkDetails;
|
||||
use nym_sdk::mixnet::MixnetMessageSender;
|
||||
use nym_service_providers_common::interface::{
|
||||
BinaryInformation, ProviderInterfaceVersion, Request, RequestVersion,
|
||||
};
|
||||
@@ -223,7 +224,7 @@ impl NRServiceProviderBuilder {
|
||||
};
|
||||
|
||||
let stats_collector_clone = stats_collector.clone();
|
||||
let mixnet_client_sender = mixnet_client.sender();
|
||||
let mixnet_client_sender = mixnet_client.split_sender();
|
||||
let self_address = *mixnet_client.nym_address();
|
||||
|
||||
// start the listener for mix messages
|
||||
@@ -297,7 +298,7 @@ impl NRServiceProvider {
|
||||
/// Listens for any messages from `mix_reader` that should be written back to the mix network
|
||||
/// via the `websocket_writer`.
|
||||
async fn mixnet_response_listener(
|
||||
mut mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender,
|
||||
mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender,
|
||||
mut mix_input_reader: MixProxyReader<MixnetMessage>,
|
||||
stats_collector: Option<ServiceStatisticsCollector>,
|
||||
) {
|
||||
@@ -321,7 +322,7 @@ impl NRServiceProvider {
|
||||
}
|
||||
|
||||
let response_message = msg.into_input_message();
|
||||
mixnet_client_sender.send_input_message(response_message).await;
|
||||
mixnet_client_sender.send(response_message).await.unwrap();
|
||||
} else {
|
||||
log::error!("Exiting: channel closed!");
|
||||
break;
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::fmt;
|
||||
|
||||
use clap::{Parser, ValueEnum};
|
||||
use nym_bin_common::output_format::OutputFormat;
|
||||
use nym_sdk::mixnet::{self, IncludedSurbs};
|
||||
use nym_sdk::mixnet::{self, IncludedSurbs, MixnetMessageSender};
|
||||
use nym_service_providers_common::interface::{
|
||||
ControlRequest, ControlResponse, ProviderInterfaceVersion, Request, Response, ResponseContent,
|
||||
};
|
||||
@@ -154,34 +154,37 @@ impl QueryClient {
|
||||
|
||||
async fn query_bin_info(&mut self) -> ControlResponse {
|
||||
self.client
|
||||
.send_bytes(
|
||||
.send_message(
|
||||
self.provider,
|
||||
new_bin_info_request().into_bytes(),
|
||||
IncludedSurbs::new(10),
|
||||
)
|
||||
.await;
|
||||
.await
|
||||
.unwrap();
|
||||
wait_for_control_response(&mut self.client).await
|
||||
}
|
||||
|
||||
async fn query_supported_versions(&mut self) -> ControlResponse {
|
||||
self.client
|
||||
.send_bytes(
|
||||
.send_message(
|
||||
self.provider,
|
||||
new_supported_request_versions_request().into_bytes(),
|
||||
IncludedSurbs::new(10),
|
||||
)
|
||||
.await;
|
||||
.await
|
||||
.unwrap();
|
||||
wait_for_control_response(&mut self.client).await
|
||||
}
|
||||
|
||||
async fn query_open_proxy(&mut self) -> QueryResponse {
|
||||
self.client
|
||||
.send_bytes(
|
||||
.send_message(
|
||||
self.provider,
|
||||
new_open_proxy_request().into_bytes(),
|
||||
IncludedSurbs::new(10),
|
||||
)
|
||||
.await;
|
||||
.await
|
||||
.unwrap();
|
||||
let response = wait_for_socks5_response(&mut self.client).await;
|
||||
response
|
||||
.content
|
||||
@@ -193,12 +196,13 @@ impl QueryClient {
|
||||
async fn ping(&mut self) -> PingResponse {
|
||||
let now = std::time::Instant::now();
|
||||
self.client
|
||||
.send_bytes(
|
||||
.send_message(
|
||||
self.provider,
|
||||
new_ping_request().into_bytes(),
|
||||
IncludedSurbs::new(5),
|
||||
)
|
||||
.await;
|
||||
.await
|
||||
.unwrap();
|
||||
let response = wait_for_control_response(&mut self.client).await;
|
||||
assert!(matches!(response, ControlResponse::Health));
|
||||
let elapsed = now.elapsed();
|
||||
|
||||
Reference in New Issue
Block a user