7c890ea0c5
* First sweep packages + some minor tweaking * Second sweep * Regenerate lockfile + package.json mods * Regenerate lockfile again * Fix CI * Fix CI again * All building properly * unblock * Tweak examples * Comments + readme + fix rotten unit test * First pass docs * Big pass * Massive pass on new docs * Update integrations.md w mobile * Partial overhaul review * new playground + big pass * new fix lychee err * IPR notice tweak
301 lines
9.2 KiB
Plaintext
301 lines
9.2 KiB
Plaintext
---
|
|
title: "Client Pool Tutorial: Handle Bursty Traffic"
|
|
description: "Step-by-step Rust tutorial to use Nym ClientPool for handling bursts of concurrent mixnet operations without blocking on client creation."
|
|
schemaType: "HowTo"
|
|
section: "Developers"
|
|
lastUpdated: "2026-04-17"
|
|
---
|
|
|
|
# Tutorial: Handle Bursty Traffic with Client Pool
|
|
|
|
import { Callout } from 'nextra/components'
|
|
import { CodeVerified } from '../../../../components/code-verified'
|
|
import { RUST_MSRV } from '../../../../components/versions'
|
|
|
|
A program that uses `ClientPool` to absorb bursts of concurrent Mixnet operations without paying client-creation latency on the hot path. The pool pre-creates clients in the background; tasks pop them under load; the tutorial also walks through what happens when demand outruns supply.
|
|
|
|
## What you'll learn
|
|
|
|
- Creating and starting a `ClientPool`
|
|
- Popping clients from the pool for concurrent operations
|
|
- Falling back to on-demand client creation when the pool is empty
|
|
- Observing pool replenishment
|
|
- Graceful shutdown
|
|
|
|
<CodeVerified />
|
|
|
|
## Prerequisites
|
|
|
|
- Rust toolchain ({RUST_MSRV}+)
|
|
- A working internet connection
|
|
|
|
## Step 1: Set up the project
|
|
|
|
```sh
|
|
cargo init nym-pool-demo
|
|
cd nym-pool-demo
|
|
```
|
|
|
|
Add dependencies to `Cargo.toml`:
|
|
|
|
```toml
|
|
[dependencies]
|
|
nym-sdk = "1.21.1"
|
|
nym-network-defaults = "1.21.1"
|
|
nym-bin-common = { version = "1.21.1", features = ["basic_tracing"] }
|
|
tokio = { version = "1", features = ["full"] }
|
|
```
|
|
|
|
## Step 2: Create and start the pool
|
|
|
|
The pool is created with a **reserve size**: the number of connected clients it tries to maintain at all times. The `start()` method runs a background loop that creates clients whenever the pool drops below the reserve.
|
|
|
|
Create `src/main.rs`:
|
|
|
|
```rust
|
|
use nym_sdk::client_pool::ClientPool;
|
|
use nym_sdk::mixnet::MixnetMessageSender;
|
|
use nym_network_defaults::setup_env;
|
|
use std::time::Duration;
|
|
|
|
#[tokio::main]
|
|
async fn main() {
|
|
nym_bin_common::logging::setup_tracing_logger();
|
|
|
|
// Load mainnet network defaults into env vars (required by ClientPool)
|
|
setup_env(None::<String>);
|
|
|
|
// Create a pool that maintains 3 clients in reserve
|
|
let pool = ClientPool::new(3);
|
|
|
|
// Start the pool in a background task.
|
|
// It immediately begins connecting clients.
|
|
let pool_bg = pool.clone();
|
|
tokio::spawn(async move {
|
|
pool_bg.start().await.unwrap();
|
|
});
|
|
|
|
println!("Pool started, waiting for clients to connect...");
|
|
tokio::time::sleep(Duration::from_secs(15)).await;
|
|
|
|
// Check how many are ready
|
|
let count = pool.get_client_count().await;
|
|
println!("Pool has {count} clients ready");
|
|
```
|
|
|
|
<Callout type="info">
|
|
Creating a `MixnetClient` takes several seconds (gateway handshake, key generation, topology fetch). The pool does this work ahead of time so your application doesn't block when it needs a client.
|
|
</Callout>
|
|
|
|
## Step 3: Pop clients and use them
|
|
|
|
When you call `get_mixnet_client()`, the pool removes a client and returns it. The background loop notices the shortfall and starts creating a replacement.
|
|
|
|
```rust
|
|
// Simulate a burst of 3 concurrent tasks, each needing a client
|
|
let mut handles = vec![];
|
|
|
|
for i in 1..=3 {
|
|
let pool = pool.clone();
|
|
|
|
let handle = tokio::spawn(async move {
|
|
// Pop a client from the pool
|
|
let mut client = match pool.get_mixnet_client().await {
|
|
Some(c) => {
|
|
println!("Task {i}: got client {} from pool", c.nym_address());
|
|
c
|
|
}
|
|
None => {
|
|
// Pool is empty; fall back to creating one on the fly.
|
|
// This is slower but keeps things working.
|
|
println!("Task {i}: pool empty, creating client on the fly...");
|
|
nym_sdk::mixnet::MixnetClient::connect_new().await.unwrap()
|
|
}
|
|
};
|
|
|
|
// Do something with the client. Here, send a message to ourselves.
|
|
let addr = *client.nym_address();
|
|
client
|
|
.send_plain_message(addr, format!("hello from task {i}"))
|
|
.await
|
|
.unwrap();
|
|
|
|
// Wait for the message to arrive
|
|
if let Some(msgs) = client.wait_for_messages().await {
|
|
for msg in msgs {
|
|
if !msg.message.is_empty() {
|
|
println!(
|
|
"Task {i}: received {:?}",
|
|
String::from_utf8_lossy(&msg.message)
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Disconnect when done; the pool will create a replacement.
|
|
client.disconnect().await;
|
|
println!("Task {i}: done");
|
|
});
|
|
|
|
handles.push(handle);
|
|
}
|
|
|
|
// Wait for all tasks to finish
|
|
for h in handles {
|
|
h.await.unwrap();
|
|
}
|
|
```
|
|
|
|
## Step 4: Observe replenishment
|
|
|
|
After popping all 3 clients, the pool background loop starts creating replacements. Give it time and check:
|
|
|
|
```rust
|
|
// Pool should be replenishing
|
|
println!("\nWaiting for pool to replenish...");
|
|
tokio::time::sleep(Duration::from_secs(15)).await;
|
|
|
|
let count = pool.get_client_count().await;
|
|
println!("Pool has {count} clients ready again");
|
|
```
|
|
|
|
## Step 5: Shut down gracefully
|
|
|
|
```rust
|
|
// Disconnect all remaining clients and stop the background loop
|
|
pool.disconnect_pool().await;
|
|
println!("Pool shut down");
|
|
}
|
|
```
|
|
|
|
## Step 6: Run it
|
|
|
|
```sh
|
|
RUST_LOG=info cargo run
|
|
```
|
|
|
|
You'll see output like:
|
|
|
|
```
|
|
Pool started, waiting for clients to connect...
|
|
Pool has 3 clients ready
|
|
Task 1: got client 8gk4Y...@2xU4d... from pool
|
|
Task 2: got client F3qR7...@9nK2m... from pool
|
|
Task 3: got client A7bN2...@4pL8w... from pool
|
|
Task 1: received "hello from task 1"
|
|
Task 2: received "hello from task 2"
|
|
Task 3: received "hello from task 3"
|
|
Task 1: done
|
|
Task 2: done
|
|
Task 3: done
|
|
|
|
Waiting for pool to replenish...
|
|
Pool has 3 clients ready again
|
|
Pool shut down
|
|
```
|
|
|
|
## When to use the pool
|
|
|
|
The pool is most useful when:
|
|
|
|
- **You have bursty traffic:** many concurrent operations that each need their own client
|
|
- **Latency matters:** you can't afford the several-second delay of creating a client on each request
|
|
- **You're building a service:** an API endpoint that creates a client per request would benefit from pre-warmed clients
|
|
|
|
If your application only ever needs one client at a time, just use `MixnetClient::connect_new()` directly.
|
|
|
|
<Callout type="info">
|
|
The `NymProxyClient` (TcpProxy module) uses a `ClientPool` internally: one client per incoming TCP connection.
|
|
</Callout>
|
|
|
|
## What you've learned
|
|
|
|
- **`ClientPool::new(n)`** creates a pool targeting `n` reserve clients
|
|
- **`pool.start()`** runs a background loop that creates clients whenever the pool is below reserve
|
|
- **`pool.get_mixnet_client()`** pops a client; returns `None` if the pool is empty
|
|
- **Clients are consumed, not returned.** The pool automatically creates replacements
|
|
- **`pool.disconnect_pool()`** shuts down all remaining clients and stops the background loop
|
|
- **Fall back to on-demand creation** when the pool is empty for resilience
|
|
|
|
## Complete code
|
|
|
|
```rust
|
|
use nym_sdk::client_pool::ClientPool;
|
|
use nym_sdk::mixnet::MixnetMessageSender;
|
|
use nym_network_defaults::setup_env;
|
|
use std::time::Duration;
|
|
|
|
#[tokio::main]
|
|
async fn main() {
|
|
nym_bin_common::logging::setup_tracing_logger();
|
|
setup_env(None::<String>);
|
|
|
|
let pool = ClientPool::new(3);
|
|
|
|
let pool_bg = pool.clone();
|
|
tokio::spawn(async move {
|
|
pool_bg.start().await.unwrap();
|
|
});
|
|
|
|
println!("Pool started, waiting for clients to connect...");
|
|
tokio::time::sleep(Duration::from_secs(15)).await;
|
|
|
|
let count = pool.get_client_count().await;
|
|
println!("Pool has {count} clients ready");
|
|
|
|
let mut handles = vec![];
|
|
|
|
for i in 1..=3 {
|
|
let pool = pool.clone();
|
|
|
|
let handle = tokio::spawn(async move {
|
|
let mut client = match pool.get_mixnet_client().await {
|
|
Some(c) => {
|
|
println!("Task {i}: got client {} from pool", c.nym_address());
|
|
c
|
|
}
|
|
None => {
|
|
println!("Task {i}: pool empty, creating client on the fly...");
|
|
nym_sdk::mixnet::MixnetClient::connect_new().await.unwrap()
|
|
}
|
|
};
|
|
|
|
let addr = *client.nym_address();
|
|
client
|
|
.send_plain_message(addr, format!("hello from task {i}"))
|
|
.await
|
|
.unwrap();
|
|
|
|
if let Some(msgs) = client.wait_for_messages().await {
|
|
for msg in msgs {
|
|
if !msg.message.is_empty() {
|
|
println!(
|
|
"Task {i}: received {:?}",
|
|
String::from_utf8_lossy(&msg.message)
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
client.disconnect().await;
|
|
println!("Task {i}: done");
|
|
});
|
|
|
|
handles.push(handle);
|
|
}
|
|
|
|
for h in handles {
|
|
h.await.unwrap();
|
|
}
|
|
|
|
println!("\nWaiting for pool to replenish...");
|
|
tokio::time::sleep(Duration::from_secs(15)).await;
|
|
|
|
let count = pool.get_client_count().await;
|
|
println!("Pool has {count} clients ready again");
|
|
|
|
pool.disconnect_pool().await;
|
|
println!("Pool shut down");
|
|
}
|
|
```
|