Compare commits

...

7 Commits

Author SHA1 Message Date
mfahampshire 23d648b00f * added message types explainer
* added split task example
2023-08-17 14:01:03 +01:00
Jędrzej Stuczyński a9df5da39c removed redundant notify and instead awaiting the task futures 2023-08-17 14:01:03 +01:00
Jędrzej Stuczyński 15ea11d5fd fixed doc tests 2023-08-17 14:01:03 +01:00
Jędrzej Stuczyński 87361e076c cargo fmt 2023-08-17 14:01:02 +01:00
Jędrzej Stuczyński 935655cfb6 made the Stream return a single message at each call 2023-08-17 14:01:02 +01:00
Jędrzej Stuczyński 9bda144c99 unified sending interface + example of split sending 2023-08-17 14:01:01 +01:00
Jędrzej Stuczyński 77f14a3703 utility 'Stream' implementation for MixnetClient 2023-08-17 13:59:49 +01:00
22 changed files with 360 additions and 183 deletions
Generated
+1
View File
@@ -4474,6 +4474,7 @@ dependencies = [
name = "nym-sdk"
version = "0.1.0"
dependencies = [
"async-trait",
"bip39",
"dotenvy",
"futures",
+15 -2
View File
@@ -22,13 +22,19 @@ In the future the SDK will be made up of several components, each of which will
| Coconut | Create & verify Coconut credentials | 🛠️ |
| Validator | Sign & broadcast Nyx blockchain transactions, query the blockchain | ❌ |
The `mixnet` component currently exposes the logic of two clients: the websocket client, and the socks client.
The `mixnet` component currently exposes the logic of two clients: the [websocket client](../clients/websocket-client.md), and the [socks](../clients/socks5-client.md) client.
The `coconut` component is currently being worked on. Right now it exposes logic allowing for the creation of coconut credentials on the Sandbox testnet.
## Websocket client examples
> All the codeblocks below can be found in the `nym-sdk` [examples directory](https://github.com/nymtech/nym/tree/release/{{platform_release_version}}/sdk/rust/nym-sdk/examples) in the monorepo. Just navigate to `nym/sdk/rust/nym-sdk/examples/` and run the files from there. If you wish to run these outside of the workspace - such as if you want to use one as the basis for your own project - then make sure to import the `sdk`, `tokio`, and `nym_bin_common` crates.
### Different message types
There are two methods for sending messages through the mixnet using your client:
* `send_plain_message()` is the most simple: pass the recipient address and the message you wish to send as a string (this was previously `send_str()`). This is a nicer-to-use wrapper around `send_message()`.
* `send_message()` allows you to also define the amount of SURBs to send along with your message (which is sent as bytes).
### Simple example
Lets look at a very simple example of how you can import and use the websocket client in a piece of Rust code (`examples/simple.rs`):
```rust,noplayground
@@ -54,7 +60,7 @@ If you're integrating mixnet functionality into an existing app and want to inte
```
### Anonymous replies with SURBs
Both functions used to send messages through the mixnet (`send_str` and `send_bytes`) send a pre-determined number of SURBs along with their messages by default.
Both functions used to send messages through the mixnet (`send_message` and `send_plain_message`) send a pre-determined number of SURBs along with their messages by default.
The number of SURBs is set [here](https://github.com/nymtech/nym/blob/release/{{platform_release_version}}/sdk/rust/nym-sdk/src/mixnet/client.rs#L34):
@@ -89,6 +95,13 @@ If you aren't running a Validator and Nym API, and just want to import a specifi
{{#include ../../../../sdk/rust/nym-sdk/examples/manually_overwrite_topology.rs}}
```
### Send and receive in different tasks
If you need to split the different actions of your client across different tasks, you can do so like this:
```rust, noplayground
{{#include ../../../../sdk/rust/nym-sdk/examples/parallel_sending_and_receiving.rs}}
```
## Socks client example
There is also the option to embed the [`socks5-client`](../clients/socks5-client.md) into your app code (`examples/socks5.rs`):
+1
View File
@@ -6,6 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = { workspace = true }
bip39 = { workspace = true }
nym-client-core = { path = "../../../common/client-core", features = ["fs-surb-storage"]}
nym-crypto = { path = "../../../common/crypto" }
+5 -1
View File
@@ -1,4 +1,5 @@
use nym_sdk::mixnet;
use nym_sdk::mixnet::MixnetMessageSender;
#[tokio::main]
async fn main() {
@@ -29,7 +30,10 @@ async fn main() {
let our_address = client.nym_address();
// Send a message throughout the mixnet to ourselves
client.send_str(*our_address, "hello there").await;
client
.send_plain_message(*our_address, "hello there")
.await
.unwrap();
println!("Waiting for message");
if let Some(received) = client.wait_for_messages().await {
+5 -1
View File
@@ -1,4 +1,5 @@
use nym_sdk::mixnet;
use nym_sdk::mixnet::MixnetMessageSender;
#[tokio::main]
async fn main() {
@@ -19,7 +20,10 @@ async fn main() {
println!("Our client nym address is: {our_address}");
// Send a message through the mixnet to ourselves
client.send_str(*our_address, "hello there").await;
client
.send_plain_message(*our_address, "hello there")
.await
.unwrap();
println!("Waiting for message");
if let Some(received) = client.wait_for_messages().await {
@@ -1,4 +1,5 @@
use nym_sdk::mixnet;
use nym_sdk::mixnet::MixnetMessageSender;
use std::path::PathBuf;
#[tokio::main]
@@ -26,7 +27,10 @@ async fn main() {
println!("Our client nym address is: {our_address}");
// Send a message throught the mixnet to ourselves
client.send_str(*our_address, "hello there").await;
client
.send_plain_message(*our_address, "hello there")
.await
.unwrap();
println!("Waiting for message");
if let Some(received) = client.wait_for_messages().await {
@@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use nym_sdk::mixnet;
use nym_sdk::mixnet::MixnetMessageSender;
use nym_topology::provider_trait::{async_trait, TopologyProvider};
use nym_topology::{nym_topology_from_detailed, NymTopology};
use url::Url;
@@ -69,7 +70,10 @@ async fn main() {
println!("Our client nym address is: {our_address}");
// Send a message through the mixnet to ourselves
client.send_str(*our_address, "hello there").await;
client
.send_plain_message(*our_address, "hello there")
.await
.unwrap();
println!("Waiting for message (ctrl-c to exit)");
client
@@ -3,6 +3,7 @@ use nym_client_core::client::base_client::storage::gateway_details::{
};
use nym_sdk::mixnet::{
self, EmptyReplyStorage, EphemeralCredentialStorage, KeyManager, KeyStore, MixnetClientStorage,
MixnetMessageSender,
};
use nym_topology::provider_trait::async_trait;
@@ -26,7 +27,10 @@ async fn main() {
println!("Our client nym address is: {our_address}");
// Send important info up the pipe to a buddy
client.send_str(*our_address, "hello there").await;
client
.send_plain_message(*our_address, "hello there")
.await
.unwrap();
println!("Waiting for message");
if let Some(received) = client.wait_for_messages().await {
@@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use nym_sdk::mixnet;
use nym_sdk::mixnet::MixnetMessageSender;
use nym_topology::mix::Layer;
use nym_topology::{mix, NymTopology};
use std::collections::BTreeMap;
@@ -81,7 +82,10 @@ async fn main() {
println!("Our client nym address is: {our_address}");
// Send a message through the mixnet to ourselves
client.send_str(*our_address, "hello there").await;
client
.send_plain_message(*our_address, "hello there")
.await
.unwrap();
println!("Waiting for message (ctrl-c to exit)");
client
@@ -0,0 +1,42 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use futures::StreamExt;
use nym_sdk::mixnet;
use nym_sdk::mixnet::MixnetMessageSender;
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_logging();
// Passing no config makes the client fire up an ephemeral session and figure stuff out on its own
let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
// Be able to get our client address
let our_address = *client.nym_address();
println!("Our client nym address is: {our_address}");
let sender = client.split_sender();
// receiving task
let receiving_task_handle = tokio::spawn(async move {
if let Some(received) = client.next().await {
println!("Received: {}", String::from_utf8_lossy(&received.message));
}
client.disconnect().await;
});
// sending task
let sending_task_handle = tokio::spawn(async move {
sender
.send_plain_message(our_address, "hello from a different task!")
.await
.unwrap();
});
// wait for both tasks to be done
println!("waiting for shutdown");
sending_task_handle.await.unwrap();
receiving_task_handle.await.unwrap();
}
+5 -1
View File
@@ -1,4 +1,5 @@
use nym_sdk::mixnet;
use nym_sdk::mixnet::MixnetMessageSender;
#[tokio::main]
async fn main() {
@@ -12,7 +13,10 @@ async fn main() {
println!("Our client nym address is: {our_address}");
// Send a message throught the mixnet to ourselves
client.send_str(*our_address, "hello there").await;
client
.send_plain_message(*our_address, "hello there")
.await
.unwrap();
println!("Waiting for message (ctrl-c to exit)");
client
+10 -3
View File
@@ -1,5 +1,6 @@
use nym_sdk::mixnet::{
AnonymousSenderTag, MixnetClientBuilder, ReconstructedMessage, StoragePaths,
AnonymousSenderTag, MixnetClientBuilder, MixnetMessageSender, ReconstructedMessage,
StoragePaths,
};
use std::path::PathBuf;
@@ -28,7 +29,10 @@ async fn main() {
println!("\nOur client nym address is: {our_address}");
// Send a message through the mixnet to ourselves using our nym address
client.send_str(*our_address, "hello there").await;
client
.send_plain_message(*our_address, "hello there")
.await
.unwrap();
// we're going to parse the sender_tag (AnonymousSenderTag) from the incoming message and use it to 'reply' to ourselves instead of our Nym address.
// we know there will be a sender_tag since the sdk sends SURBs along with messages by default.
@@ -57,7 +61,10 @@ async fn main() {
// reply to self with it: note we use `send_str_reply` instead of `send_str`
println!("Replying with using SURBs");
client.send_str_reply(return_recipient, "hi an0n!").await;
client
.send_reply(return_recipient, "hi an0n!")
.await
.unwrap();
println!("Waiting for message (once you see it, ctrl-c to exit)\n");
client
+2 -2
View File
@@ -6,7 +6,7 @@
//! # Basic example
//!
//! ```no_run
//! use nym_sdk::mixnet;
//! use nym_sdk::mixnet::{self, MixnetMessageSender};
//!
//! #[tokio::main]
//! async fn main() {
@@ -27,7 +27,7 @@
//! let our_address = client.nym_address();
//!
//! // Send a message throughout the mixnet to ourselves
//! client.send_str(*our_address, "hello there").await;
//! client.send_plain_message(*our_address, "hello there").await.unwrap();
//!
//! println!("Waiting for message");
//! if let Some(received) = client.wait_for_messages().await {
+3
View File
@@ -90,6 +90,9 @@ pub enum Error {
#[error("loaded shared gateway key without providing information about what gateway it corresponds to")]
GatewayWithUnknownEndpoint,
#[error("failed to send the provided message")]
MessageSendingFailure,
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
+5 -3
View File
@@ -4,20 +4,20 @@
//! # Basic example
//!
//! ```no_run
//! use nym_sdk::mixnet;
//! use nym_sdk::mixnet::{self, MixnetMessageSender};
//!
//! #[tokio::main]
//! async fn main() {
//! // Passing no config makes the client fire up an ephemeral session and figure stuff out on
//! // its own
//! let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
//! let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
//!
//! // Be able to get our client address
//! let our_address = client.nym_address();
//! println!("Our client nym address is: {our_address}");
//!
//! // Send a message throught the mixnet to ourselves
//! client.send_str(*our_address, "hello there").await;
//! client.send_plain_message(*our_address, "hello there").await.unwrap();
//!
//! println!("Waiting for message");
//! if let Some(received) = client.wait_for_messages().await {
@@ -36,6 +36,7 @@ mod connection_state;
mod native_client;
mod paths;
mod socks5_client;
mod traits;
pub use client::{DisconnectedMixnetClient, IncludedSurbs, MixnetClientBuilder};
pub use config::{Config, KeyMode};
@@ -73,3 +74,4 @@ pub use nym_sphinx::{
pub use nym_topology::{provider_trait::TopologyProvider, NymTopology};
pub use paths::StoragePaths;
pub use socks5_client::Socks5MixnetClient;
pub use traits::MixnetMessageSender;
+4 -4
View File
@@ -497,15 +497,15 @@ where
let reconstructed_receiver = client_output.register_receiver()?;
Ok(MixnetClient {
Ok(MixnetClient::new(
nym_address,
client_input,
client_output,
client_state,
reconstructed_receiver,
task_manager: started_client.task_manager,
packet_type: None,
})
started_client.task_manager,
None,
))
}
}
+90 -138
View File
@@ -1,3 +1,9 @@
use crate::mixnet::client::MixnetClientBuilder;
use crate::mixnet::traits::MixnetMessageSender;
use crate::{Error, Result};
use async_trait::async_trait;
use futures::{ready, Stream, StreamExt};
use log::error;
use nym_client_core::client::{
base_client::{ClientInput, ClientOutput, ClientState},
inbound_messages::InputMessage,
@@ -6,16 +12,12 @@ use nym_client_core::client::{
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::{params::PacketType, receiver::ReconstructedMessage};
use nym_task::{
connections::{ConnectionCommandSender, LaneQueueLengths, TransmissionLane},
connections::{ConnectionCommandSender, LaneQueueLengths},
TaskManager,
};
use futures::StreamExt;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_topology::NymTopology;
use crate::mixnet::client::{IncludedSurbs, MixnetClientBuilder};
use crate::Result;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Client connected to the Nym mixnet.
pub struct MixnetClient {
@@ -41,9 +43,33 @@ pub struct MixnetClient {
/// The task manager that controlls all the spawned tasks that the clients uses to do it's job.
pub(crate) task_manager: TaskManager,
pub(crate) packet_type: Option<PacketType>,
// internal state used for the `Stream` implementation
_buffered: Vec<ReconstructedMessage>,
}
impl MixnetClient {
pub(crate) fn new(
nym_address: Recipient,
client_input: ClientInput,
client_output: ClientOutput,
client_state: ClientState,
reconstructed_receiver: ReconstructedMessagesReceiver,
task_manager: TaskManager,
packet_type: Option<PacketType>,
) -> Self {
Self {
nym_address,
client_input,
client_output,
client_state,
reconstructed_receiver,
task_manager,
packet_type,
_buffered: Vec::new(),
}
}
/// Create a new client and connect to the mixnet using ephemeral in-memory keys that are
/// discarded at application close.
///
@@ -74,9 +100,10 @@ impl MixnetClient {
/// Get a shallow clone of [`MixnetClientSender`]. Useful if you want split the send and
/// receive logic in different locations.
pub fn sender(&self) -> MixnetClientSender {
pub fn split_sender(&self) -> MixnetClientSender {
MixnetClientSender {
client_input: self.client_input.clone(),
packet_type: self.packet_type,
}
}
@@ -112,132 +139,6 @@ impl MixnetClient {
self.client_state.topology_accessor.release_manual_control()
}
/// Sends stringy data to the supplied Nym address
///
/// # Example
///
/// ```no_run
/// use nym_sdk::mixnet;
///
/// #[tokio::main]
/// async fn main() {
/// let address = "foobar";
/// let recipient = mixnet::Recipient::try_from_base58_string(address).unwrap();
/// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
/// client.send_str(recipient, "hi").await;
/// }
/// ```
pub async fn send_str(&self, address: Recipient, message: &str) {
self.send_bytes(address, message, IncludedSurbs::default())
.await;
}
/// Sends bytes to the supplied Nym address. There is the option to specify the number of
/// reply-SURBs to include.
///
/// # Example
///
/// ```no_run
/// use nym_sdk::mixnet;
///
/// #[tokio::main]
/// async fn main() {
/// let address = "foobar";
/// let recipient = mixnet::Recipient::try_from_base58_string(address).unwrap();
/// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
/// let surbs = mixnet::IncludedSurbs::default();
/// client.send_bytes(recipient, "hi".to_owned().into_bytes(), surbs).await;
/// }
/// ```
pub async fn send_bytes<M: AsRef<[u8]>>(
&self,
address: Recipient,
message: M,
surbs: IncludedSurbs,
) {
let lane = TransmissionLane::General;
let input_msg = match surbs {
IncludedSurbs::Amount(surbs) => InputMessage::new_anonymous(
address,
message.as_ref().to_vec(),
surbs,
lane,
self.packet_type,
),
IncludedSurbs::ExposeSelfAddress => InputMessage::new_regular(
address,
message.as_ref().to_vec(),
lane,
self.packet_type,
),
};
self.send(input_msg).await
}
/// Sends stringy reply data to the supplied anonymous recipient.
///
/// # Example
///
/// ```no_run
/// use nym_sdk::mixnet;
///
/// #[tokio::main]
/// async fn main() {
/// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
/// // note: the tag is something you would have received from a remote client sending you surbs!
/// let tag = mixnet::AnonymousSenderTag::try_from_base58_string("foobar").unwrap();
/// client.send_str_reply(tag, "hi").await;
/// }
/// ```
pub async fn send_str_reply(&self, recipient_tag: AnonymousSenderTag, message: &str) {
self.send_reply(recipient_tag, message).await;
}
/// Sends binary reply data to the supplied anonymous recipient.
///
/// # Example
///
/// ```no_run
/// use nym_sdk::mixnet;
///
/// #[tokio::main]
/// async fn main() {
/// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
/// // note: the tag is something you would have received from a remote client sending you surbs!
/// let tag = mixnet::AnonymousSenderTag::try_from_base58_string("foobar").unwrap();
/// client.send_reply(tag, b"hi").await;
/// }
/// ```
pub async fn send_reply<M: AsRef<[u8]>>(&self, recipient_tag: AnonymousSenderTag, message: M) {
let lane = TransmissionLane::General;
let input_msg = InputMessage::new_reply(
recipient_tag,
message.as_ref().to_vec(),
lane,
self.packet_type,
);
self.send(input_msg).await
}
/// Sends a [`InputMessage`] to the mixnet. This is the most low-level sending function, for
/// full customization.
async fn send(&self, message: InputMessage) {
if self.client_input.send(message).await.is_err() {
log::error!("Failed to send message");
}
}
/// Sends a [`InputMessage`] to the mixnet. This is the most low-level sending function, for
/// full customization.
///
/// Waits until the message is actually sent, or close to being sent, until returning.
///
/// NOTE: this not yet implemented.
#[allow(unused)]
async fn send_wait(&self, _message: InputMessage) {
todo!();
}
/// Wait for messages from the mixnet
pub async fn wait_for_messages(&mut self) -> Option<Vec<ReconstructedMessage>> {
self.reconstructed_receiver.next().await
@@ -265,12 +166,63 @@ impl MixnetClient {
pub struct MixnetClientSender {
client_input: ClientInput,
packet_type: Option<PacketType>,
}
impl MixnetClientSender {
pub async fn send_input_message(&mut self, message: InputMessage) {
if self.client_input.send(message).await.is_err() {
log::error!("Failed to send message");
impl Stream for MixnetClient {
type Item = ReconstructedMessage;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(next) = self._buffered.pop() {
cx.waker().wake_by_ref();
return Poll::Ready(Some(next));
}
match ready!(Pin::new(&mut self.reconstructed_receiver).poll_next(cx)) {
None => Poll::Ready(None),
Some(mut msgs) => {
// the vector itself should never be empty
if let Some(next) = msgs.pop() {
// there's more than a single message - buffer them and wake the waker
// to get polled again immediately
if !msgs.is_empty() {
self._buffered = msgs;
cx.waker().wake_by_ref();
}
Poll::Ready(Some(next))
} else {
error!("the reconstructed messages vector is empty - please let the developers know if you see this message");
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
}
}
#[async_trait]
impl MixnetMessageSender for MixnetClient {
fn packet_type(&self) -> Option<PacketType> {
self.packet_type
}
async fn send(&self, message: InputMessage) -> Result<()> {
self.client_input
.send(message)
.await
.map_err(|_| Error::MessageSendingFailure)
}
}
#[async_trait]
impl MixnetMessageSender for MixnetClientSender {
fn packet_type(&self) -> Option<PacketType> {
self.packet_type
}
async fn send(&self, message: InputMessage) -> Result<()> {
self.client_input
.send(message)
.await
.map_err(|_| Error::MessageSendingFailure)
}
}
+119
View File
@@ -0,0 +1,119 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::mixnet::{AnonymousSenderTag, IncludedSurbs, Recipient};
use crate::Result;
use async_trait::async_trait;
use nym_client_core::client::inbound_messages::InputMessage;
use nym_sphinx::params::PacketType;
use nym_task::connections::TransmissionLane;
// defined to guarantee common interface regardless of whether you're using the full client
// or just the sending handler
#[async_trait]
pub trait MixnetMessageSender {
fn packet_type(&self) -> Option<PacketType> {
None
}
/// Sends a [`InputMessage`] to the mixnet. This is the most low-level sending function, for
/// full customization.
async fn send(&self, message: InputMessage) -> Result<()>;
/// Sends data to the supplied Nym address with the default surb behaviour.
///
/// # Example
///
/// ```no_run
/// use nym_sdk::mixnet::{self, MixnetMessageSender};
///
/// #[tokio::main]
/// async fn main() {
/// let address = "foobar";
/// let recipient = mixnet::Recipient::try_from_base58_string(address).unwrap();
/// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
/// client.send_plain_message(recipient, "hi").await.unwrap();
/// }
/// ```
async fn send_plain_message<M>(&self, address: Recipient, message: M) -> Result<()>
where
M: AsRef<[u8]> + Send,
{
self.send_message(address, message, IncludedSurbs::default())
.await
}
/// Sends bytes to the supplied Nym address. There is the option to specify the number of
/// reply-SURBs to include.
///
/// # Example
///
/// ```no_run
/// use nym_sdk::mixnet::{self, MixnetMessageSender};
///
/// #[tokio::main]
/// async fn main() {
/// let address = "foobar";
/// let recipient = mixnet::Recipient::try_from_base58_string(address).unwrap();
/// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
/// let surbs = mixnet::IncludedSurbs::default();
/// client.send_message(recipient, "hi".to_owned().into_bytes(), surbs).await.unwrap();
/// }
/// ```
async fn send_message<M>(
&self,
address: Recipient,
message: M,
surbs: IncludedSurbs,
) -> Result<()>
where
M: AsRef<[u8]> + Send,
{
let lane = TransmissionLane::General;
let input_msg = match surbs {
IncludedSurbs::Amount(surbs) => InputMessage::new_anonymous(
address,
message.as_ref().to_vec(),
surbs,
lane,
self.packet_type(),
),
IncludedSurbs::ExposeSelfAddress => InputMessage::new_regular(
address,
message.as_ref().to_vec(),
lane,
self.packet_type(),
),
};
self.send(input_msg).await
}
/// Sends reply data to the supplied anonymous recipient.
///
/// # Example
///
/// ```no_run
/// use nym_sdk::mixnet::{self, MixnetMessageSender};
///
/// #[tokio::main]
/// async fn main() {
/// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
/// // note: the tag is something you would have received from a remote client sending you surbs!
/// let tag = mixnet::AnonymousSenderTag::try_from_base58_string("foobar").unwrap();
/// client.send_reply(tag, b"hi").await.unwrap();
/// }
/// ```
async fn send_reply<M>(&self, recipient_tag: AnonymousSenderTag, message: M) -> Result<()>
where
M: AsRef<[u8]> + Send,
{
let lane = TransmissionLane::General;
let input_msg = InputMessage::new_reply(
recipient_tag,
message.as_ref().to_vec(),
lane,
self.packet_type(),
);
self.send(input_msg).await
}
}
@@ -3,7 +3,9 @@
// use nym_client::client::config::{BaseClientConfig, Config, GatewayEndpointConfig};
// use nym_client::client::{DirectClient, KeyManager, Recipient, ReconstructedMessage, SocketClient};
use nym_sdk::mixnet::{IncludedSurbs, MixnetClient, Recipient, ReconstructedMessage};
use nym_sdk::mixnet::{
IncludedSurbs, MixnetClient, MixnetMessageSender, Recipient, ReconstructedMessage,
};
use nym_service_providers_common::interface::{
ControlRequest, ControlResponse, ProviderInterfaceVersion, Request, Response, ResponseContent,
};
@@ -51,34 +53,34 @@ async fn main() -> anyhow::Result<()> {
// TODO: currently we HAVE TO use surbs unfortunately
println!("Sending 'Health' request...");
client
.send_bytes(
.send_message(
provider,
full_request_health.into_bytes(),
IncludedSurbs::new(10),
)
.await;
.await?;
let response = wait_for_control_response(&mut client).await;
println!("response to 'Health' request: {response:#?}");
println!("Sending 'BinaryInfo' request...");
client
.send_bytes(
.send_message(
provider,
full_request_binary_info.into_bytes(),
IncludedSurbs::none(),
)
.await;
.await?;
let response = wait_for_control_response(&mut client).await;
println!("response to 'BinaryInfo' request: {response:#?}");
println!("Sending 'SupportedRequestVersions' request...");
client
.send_bytes(
.send_message(
provider,
full_request_versions.into_bytes(),
IncludedSurbs::none(),
)
.await;
.await?;
let response = wait_for_control_response(&mut client).await;
println!("response to 'SupportedRequestVersions' request: {response:#?}");
@@ -1,7 +1,9 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_sdk::mixnet::{IncludedSurbs, MixnetClient, Recipient, ReconstructedMessage};
use nym_sdk::mixnet::{
IncludedSurbs, MixnetClient, MixnetMessageSender, Recipient, ReconstructedMessage,
};
use nym_service_providers_common::interface::{
ProviderInterfaceVersion, Request, Response, ResponseContent,
};
@@ -47,23 +49,23 @@ async fn main() -> anyhow::Result<()> {
);
println!("Sending 'OpenProxy' query...");
client
.send_bytes(
.send_message(
provider,
open_proxy_request.into_bytes(),
IncludedSurbs::new(10),
)
.await;
.await?;
let response = wait_for_response(&mut client).await;
println!("response to 'OpenProxy' query: {response:#?}");
println!("Sending 'Description' query...");
client
.send_bytes(
.send_message(
provider,
description_request.into_bytes(),
IncludedSurbs::none(),
)
.await;
.await?;
let response = wait_for_response(&mut client).await;
println!("response to 'Description' query: {response:#?}");
@@ -16,6 +16,7 @@ use log::warn;
use nym_bin_common::bin_info_owned;
use nym_client_core::config::disk_persistence::CommonClientPaths;
use nym_network_defaults::NymNetworkDetails;
use nym_sdk::mixnet::MixnetMessageSender;
use nym_service_providers_common::interface::{
BinaryInformation, ProviderInterfaceVersion, Request, RequestVersion,
};
@@ -223,7 +224,7 @@ impl NRServiceProviderBuilder {
};
let stats_collector_clone = stats_collector.clone();
let mixnet_client_sender = mixnet_client.sender();
let mixnet_client_sender = mixnet_client.split_sender();
let self_address = *mixnet_client.nym_address();
// start the listener for mix messages
@@ -297,7 +298,7 @@ impl NRServiceProvider {
/// Listens for any messages from `mix_reader` that should be written back to the mix network
/// via the `websocket_writer`.
async fn mixnet_response_listener(
mut mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender,
mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender,
mut mix_input_reader: MixProxyReader<MixnetMessage>,
stats_collector: Option<ServiceStatisticsCollector>,
) {
@@ -321,7 +322,7 @@ impl NRServiceProvider {
}
let response_message = msg.into_input_message();
mixnet_client_sender.send_input_message(response_message).await;
mixnet_client_sender.send(response_message).await.unwrap();
} else {
log::error!("Exiting: channel closed!");
break;
+13 -9
View File
@@ -2,7 +2,7 @@ use std::fmt;
use clap::{Parser, ValueEnum};
use nym_bin_common::output_format::OutputFormat;
use nym_sdk::mixnet::{self, IncludedSurbs};
use nym_sdk::mixnet::{self, IncludedSurbs, MixnetMessageSender};
use nym_service_providers_common::interface::{
ControlRequest, ControlResponse, ProviderInterfaceVersion, Request, Response, ResponseContent,
};
@@ -154,34 +154,37 @@ impl QueryClient {
async fn query_bin_info(&mut self) -> ControlResponse {
self.client
.send_bytes(
.send_message(
self.provider,
new_bin_info_request().into_bytes(),
IncludedSurbs::new(10),
)
.await;
.await
.unwrap();
wait_for_control_response(&mut self.client).await
}
async fn query_supported_versions(&mut self) -> ControlResponse {
self.client
.send_bytes(
.send_message(
self.provider,
new_supported_request_versions_request().into_bytes(),
IncludedSurbs::new(10),
)
.await;
.await
.unwrap();
wait_for_control_response(&mut self.client).await
}
async fn query_open_proxy(&mut self) -> QueryResponse {
self.client
.send_bytes(
.send_message(
self.provider,
new_open_proxy_request().into_bytes(),
IncludedSurbs::new(10),
)
.await;
.await
.unwrap();
let response = wait_for_socks5_response(&mut self.client).await;
response
.content
@@ -193,12 +196,13 @@ impl QueryClient {
async fn ping(&mut self) -> PingResponse {
let now = std::time::Instant::now();
self.client
.send_bytes(
.send_message(
self.provider,
new_ping_request().into_bytes(),
IncludedSurbs::new(5),
)
.await;
.await
.unwrap();
let response = wait_for_control_response(&mut self.client).await;
assert!(matches!(response, ControlResponse::Health));
let elapsed = now.elapsed();