Max/general abstraction updates (#5560)

- new instance of echo server with lib / cli split 
- echo server docs update 
- tcpproxy and echosever now listen for kill signal 
- ffi bindings of tcpproxy functions updated
This commit is contained in:
mfahampshire
2025-05-14 17:51:18 +02:00
committed by GitHub
parent ea90d7b558
commit 9b6b2117dd
25 changed files with 987 additions and 468 deletions
Generated
+8 -1
View File
@@ -2327,10 +2327,15 @@ dependencies = [
"bincode",
"bytecodec",
"bytes",
"clap",
"dashmap",
"dirs",
"futures",
"nym-bin-common",
"nym-crypto",
"nym-sdk",
"serde",
"tempfile",
"tokio",
"tokio-stream",
"tokio-util",
@@ -5641,6 +5646,7 @@ dependencies = [
"bs58",
"lazy_static",
"nym-bin-common",
"nym-crypto",
"nym-sdk",
"nym-sphinx-anonymous-replies",
"tokio",
@@ -5799,11 +5805,12 @@ dependencies = [
[[package]]
name = "nym-go-ffi"
version = "0.2.1"
version = "0.2.2"
dependencies = [
"anyhow",
"lazy_static",
"nym-bin-common",
"nym-crypto",
"nym-ffi-shared",
"nym-sdk",
"nym-sphinx-anonymous-replies",
+1 -7
View File
@@ -123,7 +123,6 @@ members = [
"service-providers/ip-packet-router",
"service-providers/network-requester",
"tools/echo-server",
"tools/echo-server",
"tools/internal/contract-state-importer/importer-cli",
"tools/internal/contract-state-importer/importer-contract",
"tools/internal/mixnet-connectivity-check",
@@ -161,12 +160,7 @@ default-members = [
"tools/nymvisor",
]
exclude = [
"explorer",
"contracts",
"nym-wallet",
"cpu-cycles",
]
exclude = ["explorer", "contracts", "nym-wallet", "cpu-cycles"]
[workspace.package]
authors = ["Nym Technologies SA"]
@@ -10,11 +10,15 @@ This server was initially built for the `TcpProxy` tests, but can be useful for
Run `cargo build --release` from `nym/tools/echo-server`. The binary will be in the main workspace `target/release` dir.
## Run
All the server requires is a port to connect the TCP listener to. If no `<PATH_TO_ENV>` is supplied then the server defaults to using mainnet.
```sh
echo-server <PORT> <PATH_TO_ENV_FILE>
# e.g. ../../target/release/echo-server 9000 ../../envs/canary.env
Usage: echo-server [OPTIONS]
Options:
-g, --gateway <GATEWAY> Optional gateway to use
-c, --config-path <CONFIG_PATH> Optional config path to specify
-e, --env <ENV> Optional env file - defaults to Mainnet if None
-l, --listen-port <LISTEN_PORT> Listen port [default: 8080]
-h, --help Print help
```
## Logging
+2 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-go-ffi"
version = "0.2.1"
version = "0.2.2"
edition = "2021"
license.workspace = true
@@ -13,6 +13,7 @@ name = "nym_go_ffi"
uniffi = { workspace = true, features = ["cli"] }
# Nym clients, addressing, packet format, common tools (logging), ffi shared
nym-sdk = { path = "../../rust/nym-sdk/" }
nym-crypto = { path = "../../../common/crypto" }
nym-bin-common = { path = "../../../common/bin-common", features = ["basic_tracing"] }
nym-sphinx-anonymous-replies = { path = "../../../common/nymsphinx/anonymous-replies" }
nym-ffi-shared = { path = "../shared" }
+3 -3
View File
@@ -401,7 +401,7 @@ func uniffiCheckChecksums() {
checksum := rustCall(func(uniffiStatus *C.RustCallStatus) C.uint16_t {
return C.uniffi_nym_go_ffi_checksum_func_new_proxy_server(uniffiStatus)
})
if checksum != 40789 {
if checksum != 35729 {
// If this happens try cleaning and rebuilding your project
panic("bindings: uniffi_nym_go_ffi_checksum_func_new_proxy_server: UniFFI API checksum mismatch")
}
@@ -1003,9 +1003,9 @@ func NewProxyClientDefault(serverAddress string, env *string) error {
return _uniffiErr
}
func NewProxyServer(upstreamAddress string, configDir string, env *string) error {
func NewProxyServer(upstreamAddress string, configDir string, env *string, gateway *string) error {
_, _uniffiErr := rustCallWithError(FfiConverterTypeGoWrapError{}, func(_uniffiStatus *C.RustCallStatus) bool {
C.uniffi_nym_go_ffi_fn_func_new_proxy_server(FfiConverterStringINSTANCE.Lower(upstreamAddress), FfiConverterStringINSTANCE.Lower(configDir), FfiConverterOptionalStringINSTANCE.Lower(env), _uniffiStatus)
C.uniffi_nym_go_ffi_fn_func_new_proxy_server(FfiConverterStringINSTANCE.Lower(upstreamAddress), FfiConverterStringINSTANCE.Lower(configDir), FfiConverterOptionalStringINSTANCE.Lower(env), FfiConverterOptionalStringINSTANCE.Lower(gateway), _uniffiStatus)
return false
})
return _uniffiErr
+1
View File
@@ -104,6 +104,7 @@ void uniffi_nym_go_ffi_fn_func_new_proxy_server(
RustBuffer upstream_address,
RustBuffer config_dir,
RustBuffer env,
RustBuffer gateway,
RustCallStatus* out_status
);
+1 -1
View File
@@ -99,7 +99,7 @@ func main() {
}
// init a proxy server
build_serv_err := bindings.NewProxyServer(upstreamAddress, configDir, &env_path)
build_serv_err := bindings.NewProxyServer(upstreamAddress, configDir, &env_path, nil)
if build_serv_err != nil {
fmt.Println(build_serv_err)
return
+1 -1
View File
@@ -36,7 +36,7 @@ namespace bindings {
[Throws=GoWrapError]
void run_proxy_client();
[Throws=GoWrapError]
void new_proxy_server(string upstream_address, string config_dir, string? env);
void new_proxy_server(string upstream_address, string config_dir, string? env, string? gateway);
[Throws=GoWrapError]
string proxy_server_address();
[Throws=GoWrapError]
+25 -3
View File
@@ -4,6 +4,7 @@
// due to autogenerated code
#![allow(clippy::empty_line_after_doc_comments)]
use nym_crypto::asymmetric::ed25519;
use nym_sdk::mixnet::Recipient;
use nym_sphinx_anonymous_replies::requests::AnonymousSenderTag;
uniffi::include_scaffolding!("bindings");
@@ -141,10 +142,31 @@ fn new_proxy_server(
upstream_address: String,
config_dir: String,
env: Option<String>,
gateway: Option<String>,
) -> Result<(), GoWrapError> {
match nym_ffi_shared::proxy_server_new_internal(&upstream_address, &config_dir, env) {
Ok(_) => Ok(()),
Err(_) => Err(GoWrapError::ServerInitError {}),
if gateway.is_some() {
let gateway_key = match gateway {
Some(gateway_str) => match ed25519::PublicKey::from_base58_string(&gateway_str) {
Ok(key) => Ok(key),
Err(err) => Err(anyhow::anyhow!("Failed to parse gateway key: {}", err)),
},
None => Err(anyhow::anyhow!("Gateway string is None")),
};
match nym_ffi_shared::proxy_server_new_internal(
&upstream_address,
&config_dir,
env,
Some(gateway_key.expect("Couldn't unwrap gateway key")),
) {
Ok(_) => Ok(()),
Err(_) => Err(GoWrapError::ServerInitError {}),
}
} else {
match nym_ffi_shared::proxy_server_new_internal(&upstream_address, &config_dir, env, None) {
Ok(_) => Ok(()),
Err(_) => Err(GoWrapError::ServerInitError {}),
}
}
}
+1
View File
@@ -10,6 +10,7 @@ tokio = { workspace = true, features = ["full"] }
# Nym clients, addressing, packet format, common tools (logging)
nym-sdk = { path = "../../rust/nym-sdk/" }
nym-bin-common = { path = "../../../common/bin-common" }
nym-crypto = { path = "../../../common/crypto" }
nym-sphinx-anonymous-replies = { path = "../../../common/nymsphinx/anonymous-replies" }
# static var macro
lazy_static = { workspace = true }
+5 -1
View File
@@ -7,6 +7,8 @@ use nym_sdk::mixnet::{
MixnetClient, MixnetClientBuilder, MixnetMessageSender, Recipient, ReconstructedMessage,
StoragePaths,
};
use nym_crypto::asymmetric::ed25519;
use nym_sdk::tcp_proxy::{NymProxyClient, NymProxyServer};
use nym_sphinx_anonymous_replies::requests::AnonymousSenderTag;
use std::path::PathBuf;
@@ -221,12 +223,14 @@ pub fn proxy_server_new_internal(
upstream_address: &str,
config_dir: &str,
env: Option<String>,
gateway: Option<ed25519::PublicKey>,
) -> Result<(), Error> {
if NYM_PROXY_SERVER.lock().unwrap().as_ref().is_some() {
bail!("proxy client already exists");
} else {
RUNTIME.block_on(async move {
let init_proxy_server = NymProxyServer::new(upstream_address, config_dir, env).await?;
let init_proxy_server =
NymProxyServer::new(upstream_address, config_dir, env, gateway).await?;
let mut client = NYM_PROXY_SERVER.try_lock();
if let Ok(ref mut client) = client {
**client = Some(init_proxy_server);
+5 -5
View File
@@ -18,9 +18,9 @@ path = "src/tcp_proxy/bin/proxy_client.rs"
async-trait = { workspace = true }
bip39 = { workspace = true }
nym-client-core = { path = "../../../common/client-core", features = [
"fs-credentials-storage",
"fs-surb-storage",
"fs-gateways-storage",
"fs-credentials-storage",
"fs-surb-storage",
"fs-gateways-storage",
] }
nym-crypto = { path = "../../../common/crypto" }
nym-gateway-requests = { path = "../../../common/gateway-requests" }
@@ -36,14 +36,14 @@ nym-task = { path = "../../../common/task" }
nym-topology = { path = "../../../common/topology" }
nym-socks5-client-core = { path = "../../../common/socks5-client-core" }
nym-validator-client = { path = "../../../common/client-libs/validator-client", features = [
"http-client",
"http-client",
] }
nym-socks5-requests = { path = "../../../common/socks5/requests" }
nym-ordered-buffer = { path = "../../../common/socks5/ordered-buffer" }
nym-service-providers-common = { path = "../../../service-providers/common" }
nym-sphinx-addressing = { path = "../../../common/nymsphinx/addressing" }
nym-bin-common = { path = "../../../common/bin-common", features = [
"basic_tracing",
"basic_tracing",
] }
bytecodec = { workspace = true }
httpcodec = { workspace = true }
@@ -52,7 +52,7 @@ async fn main() -> anyhow::Result<()> {
// Within the TcpProxyClient, individual client shutdown is triggered by the timeout. The final argument is how many clients to keep in reserve in the client pool when running the TcpProxy.
let proxy_client =
tcp_proxy::NymProxyClient::new(server, "127.0.0.1", &listen_port, 45, Some(env), 2).await?;
tcp_proxy::NymProxyClient::new(server, "127.0.0.1", &listen_port, 80, Some(env), 3).await?;
// For our disconnect() logic below
let proxy_clone = proxy_client.clone();
@@ -80,7 +80,7 @@ async fn main() -> anyhow::Result<()> {
println!("done. sending bytes");
// In the info traces you will see the different session IDs being set up, one for each TcpStream.
for i in 0..8 {
for i in 0..4 {
let client_cancel_inner_token = client_cancel_token.clone();
if client_cancel_token.is_cancelled() {
break;
@@ -101,7 +101,7 @@ async fn main() -> anyhow::Result<()> {
// Lets just send a bunch of messages to the server with variable delays between them, with a message and tcp connection ids to keep track of ordering on the server side (for illustrative purposes **only**; keeping track of anonymous replies is handled by the proxy under the hood with Single Use Reply Blocks (SURBs); for this illustration we want some kind of app-level message id, but irl most of the time you'll probably be parsing on e.g. the incoming response type instead)
tokio::spawn(async move {
for i in 0..8 {
for i in 0..4 {
if client_cancel_inner_token.is_cancelled() {
break;
}
@@ -132,7 +132,7 @@ async fn main() -> anyhow::Result<()> {
match bincode::deserialize::<ExampleMessage>(&bytes) {
Ok(msg) => {
reply_counter += 1;
println!("<< conn {} received {}/8", msg.tcp_conn, reply_counter);
println!("<< conn {} received {}/4", msg.tcp_conn, reply_counter);
}
Err(e) => {
println!("<< client received something that wasn't an example message of {} bytes. error: {}", bytes.len(), e);
@@ -147,12 +147,18 @@ async fn main() -> anyhow::Result<()> {
tokio::time::sleep(tokio::time::Duration::from_secs_f64(delay)).await;
}
loop {
if example_cancel_token.is_cancelled() {
break;
}
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
Ok(())
}
// emulate a series of small messages followed by a closing larger one
fn gen_bytes_fixed(i: usize) -> Vec<u8> {
let amounts = [10, 15, 50, 1000, 10, 15, 500, 2000];
let amounts = [10, 15, 50, 1000, 2000];
let len = amounts[i];
let mut rng = rand::thread_rng();
(0..len).map(|_| rng.gen::<u8>()).collect()
@@ -55,9 +55,13 @@ async fn main() -> anyhow::Result<()> {
let env = env_path.to_string();
let client_port = env::args().nth(3).expect("Port not specified");
let mut proxy_server =
tcp_proxy::NymProxyServer::new(&upstream_tcp_addr, &conf_path, Some(env_path.clone()))
.await?;
let mut proxy_server = tcp_proxy::NymProxyServer::new(
&upstream_tcp_addr,
&conf_path,
Some(env_path.clone()),
None,
)
.await?;
let proxy_nym_addr = proxy_server.nym_address();
// We'll run the instance with a long timeout since we're sending everything down the same Tcp connection, so should be using a single session.
@@ -1,5 +1,9 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::mixnet::{MixnetClient, MixnetClientBuilder, NymNetworkDetails};
use anyhow::Result;
use nym_crypto::asymmetric::ed25519;
use std::fmt;
use std::sync::Arc;
use tokio::sync::RwLock;
@@ -108,6 +112,49 @@ impl ClientPool {
}
}
// Even though this is basically start() with an extra param since I think this
// will only be used for testing scenarios, and I didn't want to unnecessarily add
// another param to the function that will be used elsewhere, hence this is its own fn
pub async fn start_with_specified_gateway(&self, gateway: ed25519::PublicKey) -> Result<()> {
loop {
let spawned_clients = self.clients.read().await.len();
let addresses = self;
debug!(
"Currently spawned clients: {}: {:?}",
spawned_clients, addresses
);
if self.cancel_token.is_cancelled() {
break Ok(());
}
if spawned_clients >= self.client_pool_reserve_number {
debug!("Got enough clients already: sleeping");
} else {
info!(
"Clients in reserve = {}, reserve amount = {}, spawning new client",
spawned_clients, self.client_pool_reserve_number
);
let client = loop {
let net = NymNetworkDetails::new_from_env();
match MixnetClientBuilder::new_ephemeral()
.network_details(net)
.request_gateway(gateway.to_string())
.build()?
.connect_to_mixnet()
.await
{
Ok(client) => break client,
Err(err) => {
warn!("Error creating client: {:?}, will retry in 100ms", err);
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
}
};
self.clients.write().await.push(Arc::new(client));
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
}
pub async fn disconnect_pool(&self) {
info!("Triggering Client Pool disconnect");
self.cancel_token.cancel();
+266 -205
View File
@@ -1,220 +1,281 @@
//! use nym_sdk::tcp_proxy;
//! use rand::rngs::SmallRng;
//! use rand::{Rng, SeedableRng};
//! use serde::{Deserialize, Serialize};
//! use std::env;
//! use std::fs;
//! use std::sync::atomic::{AtomicU8, Ordering};
//! use tokio::io::AsyncWriteExt;
//! use tokio::net::{TcpListener, TcpStream};
//! use tokio::signal;
//! The TcpProxy Module of the Nym SDK which exposes a socket interface for the Mixnet
//!
//! # Basic Example
//!
//! ```no_run
//! use nym_sdk::client_pool::ClientPool;
//! use nym_sdk::mixnet::{IncludedSurbs, MixnetClientBuilder, MixnetMessageSender, NymNetworkDetails};
//! use nym_sdk::tcp_proxy::utils::{MessageBuffer, Payload, ProxiedMessage};
//! use anyhow::Result;
//! use dashmap::DashSet;
//! use nym_network_defaults::setup_env;
//! use nym_sphinx::addressing::Recipient;
//! use std::sync::Arc;
//! use tokio::{
//! net::{TcpListener, TcpStream},
//! sync::oneshot,
//! };
//! use tokio_stream::StreamExt;
//! use tokio_util::codec;
//! use tokio_util::codec::{BytesCodec, FramedRead};
//! use tokio_util::sync::CancellationToken;
//! use tracing_subscriber::{fmt, prelude::*, EnvFilter};
//! #[derive(Serialize, Deserialize, Debug)]
//! struct ExampleMessage {
//! message_id: i8,
//! message_bytes: Vec<u8>,
//! use tracing::{debug, info, instrument};
//!
//! const DEFAULT_CLOSE_TIMEOUT: u64 = 60; // seconds
//! const DEFAULT_LISTEN_HOST: &str = "127.0.0.1";
//! const DEFAULT_LISTEN_PORT: &str = "8080";
//! const DEFAULT_CLIENT_POOL_SIZE: usize = 2;
//!
//! #[derive(Clone)]
//! pub struct NymProxyClient {
//! server_address: Recipient,
//! listen_address: String,
//! listen_port: String,
//! close_timeout: u64,
//! conn_pool: ClientPool,
//! cancel_token: CancellationToken,
//! }
//! // This is a basic example which opens a single TCP connection and writes a bunch of messages between a client and an echo
//! // server, so only uses a single session under the hood and doesn't really show off the message ordering capabilities; this is mainly
//! // just a quick introductory illustration on how:
//! // - the mixnet does message ordering
//! // - the NymProxyClient and NymProxyServer can be hooked into and used to communicate between two otherwise pretty vanilla TcpStreams
//! //
//! // For a more irl example checkout tcp_proxy_multistream.rs
//! //
//! // Run this with:
//! // `cargo run --example tcp_proxy_single_connection <SERVER_LISTEN_PORT> <ENV_FILE_PATH> <CLIENT_LISTEN_PATH>` e.g.
//! // `cargo run --example tcp_proxy_single_connection 8081 ../../../envs/canary.env 8080 `
//! #[tokio::main]
//! async fn main() -> anyhow::Result<()> {
//! // Keep track of sent/received messages
//! let counter = AtomicU8::new(0);
//! // Comment this out to just see println! statements from this example, as Nym client logging is very informative but quite verbose.
//! // The Message Decay related logging gives you an ideas of the internals of the proxy message ordering. To see the contents of the msg buffer, sphinx packet chunking, etc change the tracing::Level to DEBUG.
//! tracing_subscriber::registry()
//! .with(fmt::layer())
//! .with(EnvFilter::new("nym_sdk::tcp_proxy=info"))
//! .init();
//! let server_port = env::args()
//! .nth(1)
//! .expect("Server listen port not specified");
//! let upstream_tcp_addr = format!("127.0.0.1:{}", server_port);
//! // This dir gets cleaned up at the end: NOTE if you switch env between tests without letting the file do the automatic cleanup, make sure to manually remove this directory up before running again, otherwise your client will attempt to use these keys for the new env
//! let home_dir = dirs::home_dir().expect("Unable to get home directory");
//! let conf_path = format!("{}/tmp/nym-proxy-server-config", home_dir.display());
//! let env_path = env::args().nth(2).expect("Env file not specified");
//! let env = env_path.to_string();
//! let client_port = env::args().nth(3).expect("Port not specified");
//! let mut proxy_server =
//! tcp_proxy::NymProxyServer::new(&upstream_tcp_addr, &conf_path, Some(env_path.clone()))
//! .await?;
//! let proxy_nym_addr = proxy_server.nym_address();
//! // We'll run the instance with a long timeout since we're sending everything down the same Tcp connection, so should be using a single session.
//! // Within the TcpProxyClient, individual client shutdown is triggered by the timeout.
//! // The final argument is how many clients to keep in reserve in the client pool when running the TcpProxy.
//! let proxy_client =
//! tcp_proxy::NymProxyClient::new(*proxy_nym_addr, "127.0.0.1", &client_port, 5, Some(env), 1)
//! .await?;
//! // For our disconnect() logic below
//! let proxy_clone = proxy_client.clone();
//! tokio::spawn(async move {
//! proxy_server.run_with_shutdown().await?;
//! Ok::<(), anyhow::Error>(())
//! });
//! tokio::spawn(async move {
//! proxy_client.run().await?;
//! Ok::<(), anyhow::Error>(())
//! });
//! let example_cancel_token = CancellationToken::new();
//! let server_cancel_token = example_cancel_token.clone();
//! let client_cancel_token = example_cancel_token.clone();
//! let watcher_cancel_token = example_cancel_token.clone();
//! // Cancel listener thread
//! tokio::spawn(async move {
//! signal::ctrl_c().await?;
//! println!(":: CTRL_C received, shutting down + cleanup up proxy server config files");
//! fs::remove_dir_all(conf_path)?;
//! watcher_cancel_token.cancel();
//! proxy_clone.disconnect().await;
//! Ok::<(), anyhow::Error>(())
//! });
//! // 'Server side' thread: echo back incoming as response to the messages sent in the 'client side' thread below
//! tokio::spawn(async move {
//! let listener = TcpListener::bind(upstream_tcp_addr).await?;
//!
//! impl NymProxyClient {
//! pub async fn new(
//! server_address: Recipient,
//! listen_address: &str,
//! listen_port: &str,
//! close_timeout: u64,
//! env: Option<String>,
//! default_client_amount: usize,
//! ) -> Result<Self> {
//! debug!("Loading env file: {:?}", env);
//! setup_env(env); // Defaults to mainnet if empty
//! Ok(NymProxyClient {
//! server_address,
//! listen_address: listen_address.to_string(),
//! listen_port: listen_port.to_string(),
//! close_timeout,
//! conn_pool: ClientPool::new(default_client_amount),
//! cancel_token: CancellationToken::new(),
//! })
//! }
//!
//! // server_address is the Nym address of the NymProxyServer to communicate with.
//! pub async fn new_with_defaults(server_address: Recipient, env: Option<String>) -> Result<Self> {
//! NymProxyClient::new(
//! server_address,
//! DEFAULT_LISTEN_HOST,
//! DEFAULT_LISTEN_PORT,
//! DEFAULT_CLOSE_TIMEOUT,
//! env,
//! DEFAULT_CLIENT_POOL_SIZE,
//! )
//! .await
//! }
//!
//! pub async fn run(&self) -> Result<()> {
//! info!("Connecting to mixnet server at {}", self.server_address);
//!
//! let listener =
//! TcpListener::bind(format!("{}:{}", self.listen_address, self.listen_port)).await?;
//!
//! let client_maker = self.conn_pool.clone();
//! tokio::spawn(async move {
//! client_maker.start().await?;
//! Ok::<(), anyhow::Error>(())
//! });
//!
//! loop {
//! if server_cancel_token.is_cancelled() {
//! break;
//! }
//! let (socket, _) = listener.accept().await.unwrap();
//! let (read, mut write) = socket.into_split();
//! let codec = codec::BytesCodec::new();
//! let mut framed_read = codec::FramedRead::new(read, codec);
//! while let Some(Ok(bytes)) = framed_read.next().await {
//! match bincode::deserialize::<ExampleMessage>(&bytes) {
//! Ok(msg) => {
//! println!(
//! "<< server received {}: {} bytes",
//! msg.message_id,
//! msg.message_bytes.len()
//! );
//! let msg = ExampleMessage {
//! message_id: msg.message_id,
//! message_bytes: msg.message_bytes,
//! };
//! let serialised = bincode::serialize(&msg)?;
//! write
//! .write_all(&serialised)
//! .await
//! .expect("couldnt send reply");
//! println!(
//! ">> server sent {}: {} bytes",
//! msg.message_id,
//! msg.message_bytes.len()
//! );
//! }
//! Err(e) => {
//! println!("<< server received something that wasn't an example message of {} bytes. error: {}", bytes.len(), e);
//! }
//! tokio::select! {
//! stream = listener.accept() => {
//! let (stream, _) = stream?;
//! tokio::spawn(NymProxyClient::handle_incoming(
//! stream,
//! self.server_address,
//! self.close_timeout,
//! self.conn_pool.clone(),
//! self.cancel_token.clone(),
//! ));
//! }
//! _ = self.cancel_token.cancelled() => {
//! break Ok(());
//! }
//! }
//! }
//! #[allow(unreachable_code)]
//! Ok::<(), anyhow::Error>(())
//! });
//! // Just wait for Nym clients to connect, TCP clients to bind, etc. If there isn't a client in the pool (or you started it with 0) already then the TcpProxyClient just spins up an ephemeral client itself.
//! println!("waiting for everything to be set up..");
//! tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
//! println!("done. sending bytes");
//! // Now the client and server proxies are running we can create and pipe traffic to/from
//! // a socket on the same port as our ProxyClient instance as if we were just communicating
//! // between a client and host via a normal TcpStream - albeit with a decent amount of additional latency.
//! //
//! // The assumption regarding integration is that you know what you're sending, and will do proper
//! // framing before and after, know what data types you're expecting, etc; the proxies are just piping bytes
//! // back and forth using tokio's `Bytecodec` under the hood.
//! let local_tcp_addr = format!("127.0.0.1:{}", client_port);
//! let stream = TcpStream::connect(local_tcp_addr).await?;
//! let (read, mut write) = stream.into_split();
//! // 'Client side' thread; lets just send a bunch of messages to the server with variable delays between them, with an id to keep track of ordering in the printlns; the mixnet only guarantees message delivery, not ordering. You might not be necessarily streaming traffic in this manner IRL, but this example is a good illustration of how messages travel through the mixnet.
//! // - On the level of individual messages broken into multiple packets, the Proxy abstraction deals with making sure that everything is sent between the sockets in the //! corrent order.
//! // - On the level of different messages, this is not enforced: you might see in the logs that message 1 arrives at the server and is reconstructed after message 2.
//! tokio::spawn(async move {
//! let mut rng = SmallRng::from_entropy();
//! for i in 0..10 {
//! if client_cancel_token.is_cancelled() {
//! break;
//! }
//! let random_bytes = gen_bytes_fixed(i as usize);
//! let msg = ExampleMessage {
//! message_id: i,
//! message_bytes: random_bytes,
//! };
//! let serialised = bincode::serialize(&msg)?;
//! write
//! .write_all(&serialised)
//! .await
//! .expect("couldn't write to stream");
//! println!(">> client sent {}: {} bytes", &i, msg.message_bytes.len());
//! let delay = rng.gen_range(3.0..7.0);
//! tokio::time::sleep(tokio::time::Duration::from_secs_f64(delay)).await;
//! }
//! Ok::<(), anyhow::Error>(())
//! });
//! let codec = codec::BytesCodec::new();
//! let mut framed_read = codec::FramedRead::new(read, codec);
//! while let Some(Ok(bytes)) = framed_read.next().await {
//! match bincode::deserialize::<ExampleMessage>(&bytes) {
//! Ok(msg) => {
//! println!(
//! "<< client received {}: {} bytes",
//! msg.message_id,
//! msg.message_bytes.len()
//! );
//! counter.fetch_add(1, Ordering::SeqCst);
//! println!(
//! ":: messages received back: {:?}/10",
//! counter.load(Ordering::SeqCst)
//! );
//! }
//! Err(e) => {
//! println!("<< client received something that wasn't an example message of {} bytes. error: {}", bytes.len(), e);
//! }
//! }
//! }
//! Ok(())
//! }
//! fn gen_bytes_fixed(i: usize) -> Vec<u8> {
//! let amounts = [158, 1088, 505, 1001, 150, 200, 3500, 500, 750, 100];
//! let len = amounts[i];
//! let mut rng = rand::thread_rng();
//! (0..len).map(|_| rng.gen::<u8>()).collect()
//!
//! pub async fn disconnect(&self) {
//! self.cancel_token.cancel();
//! self.conn_pool.disconnect_pool().await;
//! }
//!
//! // The main body of our logic, triggered on each accepted incoming tcp connection. To deal with assumptions about
//! // streaming we have to implement an abstract session for each set of outgoing messages atop each connection, with message
//! // IDs to deal with the fact that the mixnet does not enforce message ordering.
//! //
//! // There is an initial thread which does a bunch of setup logic
//! // - Create a random session ID
//! // - Create a Nym Client (and split into read/write clients for concurrent read/write)
//! // - Split incoming TcpStream into OwnedReadHalf and OwnedWriteHalf for concurrent read/write
//! //
//! // Then we spawn 2 tasks:
//! // - 'Outgoing' thread => frames incoming bytes from OwnedReadHalf and pipe through the mixnet & trigger session close.
//! // - 'Incoming' thread => orders incoming messages from the Mixnet via placing them in a MessageBuffer and using tick(), as well as manage session closing.
//! #[instrument(skip(stream, server_address, close_timeout, conn_pool, cancel_token))]
//! async fn handle_incoming(
//! stream: TcpStream,
//! server_address: Recipient,
//! close_timeout: u64,
//! conn_pool: ClientPool,
//! cancel_token: CancellationToken,
//! ) -> Result<()> {
//! // ID for creation of session abstraction; new session ID per new connection accepted by our tcp listener above.
//! let session_id = uuid::Uuid::new_v4();
//!
//! // Used to communicate end of session between 'Outgoing' and 'Incoming' tasks
//! let (tx, mut rx) = oneshot::channel();
//!
//! info!("Starting session: {}", session_id);
//!
//! let mut client = match conn_pool.get_mixnet_client().await {
//! Some(client) => {
//! info!("Grabbed client {} from pool", client.nym_address());
//! client
//! }
//! None => {
//! info!("Not enough clients in pool, creating ephemeral client");
//! let net = NymNetworkDetails::new_from_env();
//! let client = MixnetClientBuilder::new_ephemeral()
//! .network_details(net)
//! .build()?
//! .connect_to_mixnet()
//! .await?;
//! info!(
//! "Using {} for the moment, created outside of the connection pool",
//! client.nym_address()
//! );
//! client
//! }
//! };
//!
//! // Split our tcpstream into OwnedRead and OwnedWrite halves for concurrent read/writing
//! let (read, mut write) = stream.into_split();
//! // Since we're just trying to pipe whatever bytes our client/server are normally sending to each other,
//! // the bytescodec is fine to use here; we're trying to avoid modifying this stream e.g. in the process of Sphinx packet
//! // creation and adding padding to the payload whilst also sidestepping the need to manually manage an intermediate buffer of the
//! // incoming bytes from the tcp stream and writing them to our server with our Nym client.
//! let codec = BytesCodec::new();
//! let mut framed_read = FramedRead::new(read, codec);
//! // Much like the tcpstream, split our Nym client into a sender and receiver for concurrent read/write
//! let sender = client.split_sender();
//! // The server / service provider address our client is sending messages to will remain static
//! let server_addr = server_address;
//! // Store outgoing messages in instance of Dashset abstraction
//! let messages_account = Arc::new(DashSet::new());
//! // Wrap in an Arc for memsafe concurrent access
//! let sent_messages_account = Arc::clone(&messages_account);
//!
//! // 'Outgoing' thread
//! tokio::spawn(async move {
//! let mut message_id = 0;
//! // While able to read from OwnedReadHalf of TcpStream:
//! // - increment our messageID - we need to ensure message ordering on both client and server.
//! // - create instance of ProxiedMessage abstraction with framed bytes: this is really just the message data payload in the form of those bytes
//! // & session and messageIDs.
//! // - Serialise + send message through the mixnet to the Service Provider.
//! // - Repeat these steps, but sending a message with a payload containing a Close signal for this session; since we have message ordering implemented
//! // we can fire off the session close signal without having to wait on making sure the server has received the rest of the messages.
//! // - Trigger our session timeout alert in the 'Incoming' thread select! loop via tx end of our oneshot channel.
//! while let Some(Ok(bytes)) = framed_read.next().await {
//! message_id += 1;
//! sent_messages_account.insert(message_id);
//! let message =
//! ProxiedMessage::new(Payload::Data(bytes.to_vec()), session_id, message_id);
//! let coded_message = bincode::serialize(&message)?;
//! sender
//! .send_message(server_addr, &coded_message, IncludedSurbs::Amount(100))
//! .await?;
//! info!(
//! "Sent message with id {} for session {} of {} bytes",
//! message_id,
//! session_id,
//! bytes.len()
//! );
//! }
//! message_id += 1;
//! let message = ProxiedMessage::new(Payload::Close, session_id, message_id);
//!
//! let coded_message = bincode::serialize(&message)?;
//! sender
//! .send_message(server_addr, &coded_message, IncludedSurbs::Amount(100))
//! .await?;
//!
//! info!("Closing read end of session: {}", session_id);
//! tx.send(true)
//! .map_err(|_| anyhow::anyhow!("Could not send close signal"))?;
//! Ok::<(), anyhow::Error>(())
//! });
//!
//! // 'Incoming' thread
//! tokio::spawn(async move {
//! // Abstraction containing logic ordering: all our incoming messages need to be parsed based on their messageIDs per session.
//! // All the message-ordering and time-tracking methods are defined in utils.rs, mostly used in .tick().
//! let mut msg_buffer = MessageBuffer::new();
//! // Select!-ing one of following options:
//! // - rx is triggered by tx to log the session will end in ARGS.close_timeout time, break from this loop to pass to loop below
//! // - Deserialise incoming mixnet message, push to msg buffer and tick() to order and write to OwnedWriteHalf.
//! // - If the cancel_token is in cancelled state, break and kick down to the loop below.
//! // - Call tick() once per 100ms if neither of the above have occurred.
//! loop {
//! tokio::select! {
//! _ = &mut rx => {
//! info!("Closing write end of session: {} in {} seconds", session_id, close_timeout);
//! break
//! }
//! Some(message) = client.next() => {
//! let message = bincode::deserialize::<ProxiedMessage>(&message.message)?;
//! msg_buffer.push(message);
//! msg_buffer.tick(&mut write).await?;
//! },
//! _ = cancel_token.cancelled() => {
//! info!("CTRL_C triggered in thread, triggering loop shutdown");
//! break
//! },
//! _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
//! msg_buffer.tick(&mut write).await?;
//! }
//! }
//! }
//! // Select!-ing one of following options:
//! // - Deserialise incoming mixnet message, push to msg buffer and tick() to order and write next messageID in line to OwnedWriteHalf.
//! // - If the cancel_token is in cancelled state, shutdown client for this thread.
//! // - Sleep for session timeout and return, kills thread with Ok(()).
//! loop {
//! tokio::select! {
//! Some(message) = client.next() => {
//! let message = bincode::deserialize::<ProxiedMessage>(&message.message)?;
//! msg_buffer.push(message);
//! msg_buffer.tick(&mut write).await?;
//! },
//! _ = cancel_token.cancelled() => {
//! info!("CTRL_C triggered in thread, triggering client shutdown");
//! client.disconnect().await;
//! return Ok::<(), anyhow::Error>(())
//! },
//! _ = tokio::time::sleep(tokio::time::Duration::from_secs(close_timeout)) => {
//! info!("Closing write end of session: {}", session_id);
//! info!("Triggering client shutdown");
//! client.disconnect().await;
//! return Ok::<(), anyhow::Error>(())
//! },
//! }
//! }
//! });
//! Ok(())
//! }
//! }
//! ```
mod tcp_proxy_client;
mod tcp_proxy_server;
pub mod utils;
pub use tcp_proxy_client::NymProxyClient;
pub use tcp_proxy_server::NymProxyServer;
pub use utils::{DecayWrapper, MessageBuffer, Payload, ProxiedMessage};
@@ -1,5 +1,6 @@
use anyhow::Result;
use clap::Parser;
use nym_crypto::asymmetric::ed25519;
use nym_sdk::tcp_proxy;
#[derive(Parser, Debug)]
@@ -15,6 +16,9 @@ struct Args {
/// Optional env filepath - if none is supplied then the proxy defaults to using mainnet else just use a path to one of the supplied files in envs/ e.g. ./envs/sandbox.env
#[clap(short, long)]
env_path: Option<String>,
#[clap(short, long)]
gateway: Option<ed25519::PublicKey>,
}
#[tokio::main]
@@ -30,6 +34,7 @@ async fn main() -> Result<()> {
&args.upstream_tcp_address,
&conf_path,
args.env_path.clone(),
args.gateway,
)
.await?;
@@ -1,12 +1,14 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::client_pool::ClientPool;
use crate::mixnet::{IncludedSurbs, MixnetClientBuilder, MixnetMessageSender, NymNetworkDetails};
use std::sync::Arc;
#[path = "utils.rs"]
mod utils;
use crate::tcp_proxy::utils::{MessageBuffer, Payload, ProxiedMessage};
use anyhow::Result;
use dashmap::DashSet;
use nym_network_defaults::setup_env;
use nym_sphinx::addressing::Recipient;
use std::sync::Arc;
use tokio::{
net::{TcpListener, TcpStream},
sync::oneshot,
@@ -15,7 +17,6 @@ use tokio_stream::StreamExt;
use tokio_util::codec::{BytesCodec, FramedRead};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, instrument};
use utils::{MessageBuffer, Payload, ProxiedMessage};
const DEFAULT_CLOSE_TIMEOUT: u64 = 60; // seconds
const DEFAULT_LISTEN_HOST: &str = "127.0.0.1";
@@ -1,9 +1,13 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::mixnet::{
AnonymousSenderTag, MixnetClient, MixnetClientBuilder, MixnetClientSender, MixnetMessageSender,
NymNetworkDetails, StoragePaths,
};
use anyhow::Result;
use dashmap::DashSet;
use nym_crypto::asymmetric::ed25519;
use nym_network_defaults::setup_env;
use nym_sphinx::addressing::Recipient;
use std::path::PathBuf;
@@ -12,6 +16,7 @@ use tokio::net::TcpStream;
use tokio::sync::watch::Receiver;
use tokio::sync::RwLock;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info};
#[allow(clippy::duplicate_mod)]
#[path = "utils.rs"]
@@ -26,6 +31,9 @@ pub struct NymProxyServer {
mixnet_client_sender: Arc<RwLock<MixnetClientSender>>,
tx: tokio::sync::watch::Sender<Option<(ProxiedMessage, AnonymousSenderTag)>>,
rx: tokio::sync::watch::Receiver<Option<(ProxiedMessage, AnonymousSenderTag)>>,
cancel_token: CancellationToken,
shutdown_tx: tokio::sync::mpsc::Sender<()>,
shutdown_rx: tokio::sync::mpsc::Receiver<()>,
}
impl NymProxyServer {
@@ -33,20 +41,31 @@ impl NymProxyServer {
upstream_address: &str,
config_dir: &str,
env: Option<String>,
gateway: Option<ed25519::PublicKey>,
) -> Result<Self> {
info!("Creating client");
// We're wanting to build a client with a constant address, vs the ephemeral in-memory data storage of the NymProxyClient clients.
// Following a builder pattern, having to manually connect to the mixnet below.
// Optional selectable Gateway to use.
let config_dir = PathBuf::from(config_dir);
debug!("Loading env file: {:?}", env);
setup_env(env); // Defaults to mainnet if empty
let net = NymNetworkDetails::new_from_env();
let storage_paths = StoragePaths::new_from_dir(&config_dir)?;
let client = MixnetClientBuilder::new_with_default_storage(storage_paths)
.await?
.network_details(net)
.build()?;
let client = if let Some(gateway) = gateway {
MixnetClientBuilder::new_with_default_storage(storage_paths)
.await?
.network_details(net)
.request_gateway(gateway.to_string())
.build()?
} else {
MixnetClientBuilder::new_with_default_storage(storage_paths)
.await?
.network_details(net)
.build()?
};
let client = client.connect_to_mixnet().await?;
@@ -57,6 +76,9 @@ impl NymProxyServer {
let (tx, rx) =
tokio::sync::watch::channel::<Option<(ProxiedMessage, AnonymousSenderTag)>>(None);
// Our shutdown signal channel
let (shutdown_tx, shutdown_rx) = tokio::sync::mpsc::channel(1);
info!("Client created: {}", client.nym_address());
Ok(NymProxyServer {
@@ -66,31 +88,82 @@ impl NymProxyServer {
mixnet_client_sender: sender,
tx,
rx,
cancel_token: CancellationToken::new(),
shutdown_tx,
shutdown_rx,
})
}
pub fn nym_address(&self) -> &Recipient {
self.mixnet_client.nym_address()
}
pub async fn run_with_shutdown(&mut self) -> Result<()> {
let handle_token = self.cancel_token.child_token();
let upstream_address = self.upstream_address.clone();
let rx = self.rx();
let mixnet_sender = self.mixnet_client_sender();
let tx = self.tx.clone();
let session_map = self.session_map().clone();
pub fn mixnet_client_mut(&mut self) -> &mut MixnetClient {
&mut self.mixnet_client
}
let mut shutdown_rx =
std::mem::replace(&mut self.shutdown_rx, tokio::sync::mpsc::channel(1).1);
pub fn session_map(&self) -> &DashSet<Uuid> {
&self.session_map
}
// Then get the message stream: poll this for incoming messages
let message_stream = self.mixnet_client_mut();
pub fn mixnet_client_sender(&self) -> Arc<RwLock<MixnetClientSender>> {
Arc::clone(&self.mixnet_client_sender)
}
loop {
tokio::select! {
Some(()) = shutdown_rx.recv() => {
debug!("Received shutdown signal, stopping TcpProxyServer");
handle_token.cancel();
break;
}
// On our Mixnet client getting a new message:
// - Check if the attached sessionID exists.
// - If !sessionID, spawn a new session_handler() task.
// - Send the message down tx => rx in our handler.
message = message_stream.next() => {
if let Some(new_message) = message {
let message: ProxiedMessage = match bincode::deserialize(&new_message.message) {
Ok(msg) => {
debug!("received: {}", msg);
msg
},
Err(e) => {
error!("Failed to deserialize ProxiedMessage: {}", e);
continue;
}
};
pub fn tx(&self) -> tokio::sync::watch::Sender<Option<(ProxiedMessage, AnonymousSenderTag)>> {
self.tx.clone()
}
let session_id = message.session_id();
pub fn rx(&self) -> tokio::sync::watch::Receiver<Option<(ProxiedMessage, AnonymousSenderTag)>> {
self.rx.clone()
if session_map.insert(session_id) {
debug!("Got message for a new session");
tokio::spawn(Self::session_handler(
upstream_address.clone(),
session_id,
rx.clone(),
mixnet_sender.clone(),
handle_token.clone()
));
info!("Spawned a new session handler: {}", session_id);
}
debug!("Sending message for session {}", session_id);
if let Some(sender_tag) = new_message.sender_tag {
if let Err(e) = tx.send(Some((message, sender_tag))) {
error!("Failed to send ProxiedMessage: {}", e);
}
} else {
error!("No sender tag found, we can't send a reply without it!");
}
}
}
}
}
self.shutdown_rx = shutdown_rx;
Ok(())
}
// The main body of our logic, triggered on each received new sessionID. To deal with assumptions about
@@ -110,6 +183,7 @@ impl NymProxyServer {
session_id: Uuid,
mut rx: Receiver<Option<(ProxiedMessage, AnonymousSenderTag)>>,
sender: Arc<RwLock<MixnetClientSender>>,
cancel_token: CancellationToken,
) -> Result<()> {
let global_surb = Arc::new(RwLock::new(None));
let stream = TcpStream::connect(upstream_address).await?;
@@ -181,6 +255,9 @@ impl NymProxyServer {
}
}
}
_ = cancel_token.cancelled() => {
break;
}
_ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
msg_buffer.tick(&mut write).await?;
}
@@ -191,39 +268,65 @@ impl NymProxyServer {
Ok(())
}
pub async fn run_with_shutdown(&mut self) -> Result<()> {
// On our Mixnet client getting a new message:
// - Check if the attached sessionID exists.
// - If !sessionID, spawn a new session_handler() task.
// - Send the message down tx => rx in our handler.
while let Some(new_message) = &self.mixnet_client_mut().next().await {
let message: ProxiedMessage = bincode::deserialize(&new_message.message)?;
let session_id = message.session_id();
// If we've already got message from an existing session, continue, else add it to the session mapping and spawn a new handler().
if self.session_map().contains(&message.session_id()) {
debug!("Got message for an existing session");
} else {
self.session_map().insert(message.session_id());
debug!("Got message for a new session");
tokio::spawn(Self::session_handler(
self.upstream_address.clone(),
session_id,
self.rx(),
self.mixnet_client_sender(),
));
info!("Spawned a new session handler: {}", message.session_id());
}
pub fn disconnect_signal(&self) -> tokio::sync::mpsc::Sender<()> {
self.shutdown_tx.clone()
}
debug!("Sending message for session {}", message.session_id());
pub fn nym_address(&self) -> &Recipient {
self.mixnet_client.nym_address()
}
if let Some(sender_tag) = new_message.sender_tag {
self.tx.send(Some((message, sender_tag)))?
} else {
error!("No sender tag found, we can't send a reply without it!")
}
}
pub fn mixnet_client_mut(&mut self) -> &mut MixnetClient {
&mut self.mixnet_client
}
pub fn session_map(&self) -> &DashSet<Uuid> {
&self.session_map
}
pub fn mixnet_client_sender(&self) -> Arc<RwLock<MixnetClientSender>> {
Arc::clone(&self.mixnet_client_sender)
}
pub fn tx(&self) -> tokio::sync::watch::Sender<Option<(ProxiedMessage, AnonymousSenderTag)>> {
self.tx.clone()
}
pub fn rx(&self) -> tokio::sync::watch::Receiver<Option<(ProxiedMessage, AnonymousSenderTag)>> {
self.rx.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn shutdown_works() -> Result<()> {
let config_dir = TempDir::new()?;
let mut server = NymProxyServer::new(
"127.0.0.1:8000",
config_dir.path().to_str().unwrap(),
None, // Mainnet
None, // Random gateway
)
.await?;
// Getter for shutdown signal tx
let shutdown_tx = server.disconnect_signal();
let server_handle = tokio::spawn(async move { server.run_with_shutdown().await });
// Let it start up
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
// Kill server
shutdown_tx.send(()).await?;
// Wait for shutdown in handle + check Result != err
server_handle.await??;
tokio::signal::ctrl_c().await?;
Ok(())
}
}
+8 -6
View File
@@ -1,7 +1,9 @@
use std::{collections::HashSet, fmt, ops::Deref, time::Instant};
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::{collections::HashSet, fmt, ops::Deref, time::Instant};
use tokio::{io::AsyncWriteExt as _, net::tcp::OwnedWriteHalf};
use tracing::{debug, info};
use uuid::Uuid;
@@ -11,7 +13,7 @@ const DEFAULT_DECAY: u64 = 6; // decay time in seconds
// Keeps track of
// - incoming and unsorted messages wrapped in DecayWrapper for keeping track of when they were received
// - the expected next message ID (reset on .tick())
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct MessageBuffer {
buffer: Vec<DecayWrapper<ProxiedMessage>>,
next_msg_id: u16,
@@ -148,9 +150,9 @@ impl<T> Deref for DecayWrapper<T> {
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct ProxiedMessage {
message: Payload,
session_id: Uuid,
message_id: u16,
pub message: Payload,
pub session_id: Uuid,
pub message_id: u16,
}
impl ProxiedMessage {
@@ -175,7 +177,7 @@ impl ProxiedMessage {
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum Payload {
Data(Vec<u8>),
Close,
+13
View File
@@ -9,6 +9,11 @@ edition.workspace = true
license.workspace = true
rust-version.workspace = true
[[bin]]
name = "echo-server"
path = "src/echo-server.rs"
[dependencies]
anyhow.workspace = true
dashmap.workspace = true
@@ -24,3 +29,11 @@ bytecodec = { workspace = true }
nym-sdk = { path = "../../sdk/rust/nym-sdk/" }
bytes.workspace = true
dirs.workspace = true
clap.workspace = true
nym-bin-common = { path = "../../common/bin-common", features = [
"basic_tracing",
"output_format",
] }
nym-crypto = { path = "../../common/crypto", features = ["asymmetric"] }
futures = { workspace = true }
tempfile.workspace = true
+4 -6
View File
@@ -1,9 +1,7 @@
# Nym Echo Server
This is an initial minimal implementation of an echo server built using the `NymProxyServer` Rust SDK abstraction.
This is an initial, minimal implementation of an echo server built using the `NymProxyServer` Rust SDK abstraction.
## Usage
```
cargo build --release
../../target/release/echo-server <PORT> <PATH_TO_ENV_FILE> e.g. ../../target/release/echo-server 9000 ../../envs/canary.env
```
It currently relies on parsing out a `ProxiedMessage` from incoming messages, used by the `NymProxyClient`. In the future it will try and parse a `ReconstructedMessage` type, in order to allow standard `MixnetClient`s to receive echo messages.
You can find the docs [here](https://nym.com/docs/developers/tools/echo-server).
+47
View File
@@ -0,0 +1,47 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use anyhow::Result;
use clap::Parser;
use echo_server::NymEchoServer;
use nym_crypto::asymmetric::ed25519;
use tracing::info;
#[derive(Parser, Debug)]
struct Args {
/// Optional gateway to use
#[clap(short, long)]
gateway: Option<ed25519::PublicKey>,
/// Optional config path to specify
#[clap(short, long)]
config_path: Option<String>,
/// Optional env file - defaults to Mainnet if None
#[clap(short, long)]
env: Option<String>,
/// Listen port
#[clap(short, long, default_value = "8080")]
listen_port: String,
}
#[tokio::main]
async fn main() -> Result<()> {
nym_bin_common::logging::setup_tracing_logger();
let args = Args::parse();
let mut echo_server = NymEchoServer::new(
args.gateway,
args.config_path.as_deref(),
args.env,
args.listen_port.as_str(),
)
.await?;
let echo_addr = echo_server.nym_address().await;
info!("listening on {echo_addr}");
echo_server.run().await?;
Ok(())
}
+359
View File
@@ -0,0 +1,359 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use anyhow::Result;
use nym_crypto::asymmetric::ed25519;
use nym_sdk::{mixnet::Recipient, tcp_proxy, tcp_proxy::NymProxyServer};
use std::{
fmt::Debug,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use tokio::{
io::AsyncWriteExt,
net::{TcpListener, TcpStream},
sync::{broadcast, Mutex},
task,
};
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info};
const METRICS_TICK: u8 = 6; // Tempo of metrics logging in seconds
#[derive(Debug)]
pub struct Metrics {
total_conn: AtomicU64,
bytes_recv: AtomicU64,
bytes_sent: AtomicU64,
}
impl Metrics {
fn new() -> Self {
Self {
total_conn: AtomicU64::new(0),
bytes_recv: AtomicU64::new(0),
bytes_sent: AtomicU64::new(0),
}
}
}
pub struct NymEchoServer {
client: Arc<Mutex<NymProxyServer>>,
listen_addr: String,
metrics: Arc<Metrics>,
cancel_token: CancellationToken,
client_shutdown_tx: tokio::sync::mpsc::Sender<()>, // Shutdown signal for the TcpProxyServer instance
shutdown_tx: tokio::sync::mpsc::Sender<()>, // Shutdown signals for the EchoServer
shutdown_rx: tokio::sync::mpsc::Receiver<()>,
ready_tx: broadcast::Sender<()>, // Signal for upstream code if consuming the crate showing readiness
}
impl NymEchoServer {
pub async fn new(
gateway: Option<ed25519::PublicKey>,
config_path: Option<&str>,
env: Option<String>,
listen_port: &str,
) -> Result<Self> {
let home_dir = dirs::home_dir().expect("Unable to get home directory");
let default_path = format!("{}/tmp/nym-proxy-server-config", home_dir.display());
let config_path = config_path.unwrap_or(&default_path);
let listen_addr = format!("127.0.0.1:{}", listen_port);
let client = Arc::new(Mutex::new(
tcp_proxy::NymProxyServer::new(&listen_addr, config_path, env, gateway).await?,
));
let client_shutdown_tx = client.lock().await.disconnect_signal();
let (shutdown_tx, shutdown_rx) = tokio::sync::mpsc::channel(1);
let (ready_tx, _) = broadcast::channel(1);
Ok(NymEchoServer {
client,
listen_addr,
metrics: Arc::new(Metrics::new()),
cancel_token: CancellationToken::new(),
client_shutdown_tx,
shutdown_tx,
shutdown_rx,
ready_tx,
})
}
pub async fn run(&mut self) -> Result<()> {
let cancel_token = self.cancel_token.clone();
let mut interval =
tokio::time::interval(tokio::time::Duration::from_secs(METRICS_TICK as u64));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let client = Arc::clone(&self.client);
task::spawn(async move {
client.lock().await.run_with_shutdown().await?;
Ok::<(), anyhow::Error>(())
});
let all_metrics = Arc::clone(&self.metrics);
let listener = TcpListener::bind(self.listen_addr.clone()).await?;
debug!("{listener:?}");
let mut shutdown_rx =
std::mem::replace(&mut self.shutdown_rx, tokio::sync::mpsc::channel(1).1);
info!("Ready to accept incoming traffic");
let _ = self.ready_tx.send(());
loop {
tokio::select! {
Some(()) = shutdown_rx.recv() => {
info!("Disconnect signal received");
self.cancel_token.cancel();
info!("Cancel token cancelled: killing handle_incoming loops");
self.client_shutdown_tx.send(()).await?;
info!("Sent shutdown signal to ProxyServer instance");
break;
}
stream = listener.accept() => {
let (stream, _) = stream?;
info!("Handling new stream");
let connection_metrics = Arc::clone(&self.metrics);
connection_metrics.total_conn.fetch_add(1, Ordering::Relaxed);
tokio::spawn(NymEchoServer::handle_incoming(
stream, connection_metrics, cancel_token.clone()
));
}
_ = interval.tick() => {
info!("Metrics: total_connections_since_start={}, bytes_received={}, bytes_sent={}",
all_metrics.total_conn.load(Ordering::Relaxed),
all_metrics.bytes_recv.load(Ordering::Relaxed),
all_metrics.bytes_sent.load(Ordering::Relaxed),
);
}
}
}
self.shutdown_rx = shutdown_rx;
Ok(())
}
async fn handle_incoming(
socket: TcpStream,
metrics: Arc<Metrics>,
cancel_token: CancellationToken,
) {
let (read, mut write) = socket.into_split();
let codec = tokio_util::codec::BytesCodec::new();
let mut framed_read = tokio_util::codec::FramedRead::new(read, codec);
loop {
tokio::select! {
Some(result) = framed_read.next() => {
match result {
Ok(bytes) => {
let len = bytes.len();
metrics.bytes_recv.fetch_add(len as u64, Ordering::Relaxed);
if let Err(e) = write.write_all(&bytes).await {
error!("Failed to write to stream with err: {}", e);
break;
}
metrics.bytes_sent.fetch_add(len as u64, Ordering::Relaxed);
}
Err(e) => {
error!("Failed to read from stream with err: {}", e);
break;
}
}
}
_ = cancel_token.cancelled() => {
info!("Shutdown signal received, closing connection");
break;
}
}
}
info!("Connection closed");
}
pub fn disconnect_signal(&self) -> tokio::sync::mpsc::Sender<()> {
self.shutdown_tx.clone()
}
pub async fn nym_address(&self) -> Recipient {
*self.client.lock().await.nym_address()
}
pub fn listen_addr(&self) -> String {
self.listen_addr.clone()
}
pub fn metrics(&self) -> Arc<Metrics> {
self.metrics.clone()
}
pub fn ready_signal(&self) -> broadcast::Receiver<()> {
self.ready_tx.subscribe()
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
use nym_sdk::mixnet::{IncludedSurbs, MixnetClient, MixnetMessageSender};
use nym_sdk::tcp_proxy::{Payload, ProxiedMessage};
use tempfile::TempDir;
#[tokio::test]
async fn shutdown_works() -> Result<()> {
let config_dir = TempDir::new()?;
let mut echo_server = NymEchoServer::new(
None,
Some(config_dir.path().to_str().unwrap()),
None, // Mainnet by default
"9000",
)
.await
.unwrap();
// Getter for shutdown signal
let shutdown_tx = echo_server.disconnect_signal();
// Getter for ready signal
let mut ready_rx = echo_server.ready_signal();
// Start the echo serv
let server_handle = tokio::spawn(async move { echo_server.run().await.unwrap() });
// Wait until you can match on ready signal - you will see "Ready to accept incoming traffic" in echo server logs when running it as CLI
loop {
match ready_rx.try_recv() {
Ok(()) => {
println!("Server is ready!");
break;
}
Err(broadcast::error::TryRecvError::Empty) => {
// Channel is still empty, wait a bit and try again
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
Err(broadcast::error::TryRecvError::Closed) => {
return Err(anyhow::anyhow!(
"Ready channel closed before server was ready"
));
}
Err(broadcast::error::TryRecvError::Lagged(_)) => {
// Broadcast channel was set before we checked but handle it anyway; server is ready
break;
}
}
}
// Kill server
shutdown_tx.send(()).await?;
// Wait for shutdown in handle
server_handle.await?;
Ok(())
}
#[tokio::test]
async fn echoes_bytes() -> Result<()> {
let config_dir = TempDir::new()?;
let mut echo_server = NymEchoServer::new(
None,
Some(config_dir.path().to_str().unwrap()),
None,
"9001",
)
.await
.unwrap();
let echo_addr = echo_server.nym_address().await;
let shutdown_tx = echo_server.disconnect_signal();
let mut ready_rx = echo_server.ready_signal();
let server_handle = tokio::task::spawn(async move {
echo_server.run().await.unwrap();
});
loop {
match ready_rx.try_recv() {
Ok(()) => {
println!("Server is ready!");
break;
}
Err(broadcast::error::TryRecvError::Empty) => {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
Err(broadcast::error::TryRecvError::Closed) => {
return Err(anyhow::anyhow!(
"Ready channel closed before server was ready"
));
}
Err(broadcast::error::TryRecvError::Lagged(_)) => {
break;
}
}
}
println!("Sending message");
let session_id = uuid::Uuid::new_v4();
let message_id = 0;
let outgoing = ProxiedMessage::new(
Payload::Data("test".as_bytes().to_vec()),
session_id,
message_id,
);
let coded_message = bincode::serialize(&outgoing).unwrap();
println!("sending {:?}", coded_message);
let mut client = MixnetClient::connect_new().await.unwrap();
println!("sending client addr {}", client.nym_address());
let sender = client.split_sender();
let receiving_task_handle = tokio::spawn(async move {
println!("in handle");
if let Some(received) = client.next().await {
println!("{received:?}");
let incoming: ProxiedMessage = bincode::deserialize(&received.message).unwrap();
assert_eq!(outgoing.message, incoming.message);
}
println!("disconnecting client");
client.disconnect().await;
println!("client disconnected");
});
println!("after recv task handle");
let sending_task_handle = tokio::spawn(async move {
sender
.send_message(echo_addr, &coded_message, IncludedSurbs::Amount(10))
.await
.unwrap();
});
println!("after sending task handle");
receiving_task_handle.await.unwrap();
sending_task_handle.await.unwrap();
println!("after handles resolve");
shutdown_tx.send(()).await?;
println!("sent shutdown");
server_handle.await?;
Ok(())
}
}
-161
View File
@@ -1,161 +0,0 @@
use anyhow::Result;
use bytes::Bytes;
use nym_sdk::tcp_proxy;
use std::env;
use std::fs;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::net::{TcpListener, TcpStream};
use tokio::signal;
use tokio::sync::broadcast;
use tokio::task;
use tokio_stream::StreamExt;
use tracing::{error, info, warn};
struct Metrics {
total_conn: AtomicU64,
active_conn: AtomicU64,
bytes_recv: AtomicU64,
bytes_sent: AtomicU64,
}
impl Metrics {
fn new() -> Self {
Self {
total_conn: AtomicU64::new(0),
active_conn: AtomicU64::new(0),
bytes_recv: AtomicU64::new(0),
bytes_sent: AtomicU64::new(0),
}
}
}
#[tokio::main]
async fn main() -> Result<()> {
// if you run this with DEBUG you see the msg buffer on the ProxyServer, but its quite chatty
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.init();
let server_port = env::args()
.nth(1)
.expect("Server listen port not specified");
let tcp_addr = format!("127.0.0.1:{}", server_port);
// This dir gets cleaned up at the end: NOTE if you switch env between tests without letting the file do the automatic cleanup, make sure to manually remove this directory up before running again, otherwise your client will attempt to use these keys for the new env
let home_dir = dirs::home_dir().expect("Unable to get home directory");
let conf_path = format!("{}/tmp/nym-proxy-server-config", home_dir.display());
let env_path = env::args().nth(2).expect("Env file not specified");
let env = env_path.to_string();
let mut proxy_server = tcp_proxy::NymProxyServer::new(&tcp_addr, &conf_path, Some(env.clone()))
.await
.unwrap();
let proxy_nym_addr = *proxy_server.nym_address();
info!("ProxyServer listening out on {}", proxy_nym_addr);
task::spawn(async move {
proxy_server.run_with_shutdown().await?;
Ok::<(), anyhow::Error>(())
});
let (shutdown_sender, _) = broadcast::channel(1);
let metrics = Arc::new(Metrics::new());
let all_metrics = Arc::clone(&metrics);
tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
info!(
"Metrics: total_connections={}, active_connections={}, bytes_received={}, bytes_sent={}",
all_metrics.total_conn.load(Ordering::Relaxed),
all_metrics.active_conn.load(Ordering::Relaxed),
all_metrics.bytes_recv.load(Ordering::Relaxed),
all_metrics.bytes_sent.load(Ordering::Relaxed),
);
}
});
let listener = TcpListener::bind(tcp_addr).await?;
loop {
tokio::select! {
_ = signal::ctrl_c() => {
info!("Shutdown signal received, closing server...");
let _ = shutdown_sender.send(());
// TODO we need something like this for the ProxyServer client
break;
}
Ok((socket, _)) = listener.accept() => {
let connection_metrics = Arc::clone(&metrics);
let shutdown_rx = shutdown_sender.subscribe();
connection_metrics.total_conn.fetch_add(1, Ordering::Relaxed);
connection_metrics.active_conn.fetch_add(1, Ordering::Relaxed);
tokio::spawn(async move {
handle_incoming(socket, connection_metrics, shutdown_rx).await;
});
}
}
}
signal::ctrl_c().await?;
info!("Received CTRL+C");
fs::remove_dir_all(conf_path)?;
while metrics.active_conn.load(Ordering::Relaxed) > 0 {
info!("Waiting on active connections to close: sleeping 100ms");
// TODO some kind of hard kill here for the ProxyServer
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
Ok(())
}
async fn handle_incoming(
socket: TcpStream,
metrics: Arc<Metrics>,
mut shutdown_rx: broadcast::Receiver<()>,
) {
let (read, mut write) = socket.into_split();
let codec = tokio_util::codec::BytesCodec::new();
let mut framed_read = tokio_util::codec::FramedRead::new(read, codec);
loop {
tokio::select! {
Some(result) = framed_read.next() => {
match result {
Ok(bytes) => {
let len = bytes.len();
metrics.bytes_recv.fetch_add(len as u64, Ordering::Relaxed);
if let Err(e) = write.write_all(&bytes).await {
error!("Failed to write to stream with err: {}", e);
break;
}
metrics.bytes_sent.fetch_add(len as u64, Ordering::Relaxed);
}
Err(e) => {
error!("Failed to read from stream with err: {}", e);
break;
}
}
}
_ = shutdown_rx.recv() => {
warn!("Shutdown signal received, closing connection");
break;
}
// TODO need to work out a way that if this timesout and breaks but you dont hang up the conn on the client end you can reconnect..maybe. If we just use this as a ping echo server I dont think this is a problem
// EDIT I'm not actually sure we want this functionality? Measuring active connections might be useful though
_ = tokio::time::sleep(tokio::time::Duration::from_secs(120)) => {
info!("Timeout reached, assuming we wont get more messages on this conn, closing");
let close_message = "Closing conn, reconnect if you want to ping again";
let bytes: Bytes = close_message.into();
write.write_all(&bytes).await.expect("Couldn't write to socket");
break;
}
}
}
metrics
.active_conn
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
info!("Connection closed");
}