Compare commits

...

12 Commits

Author SHA1 Message Date
mfahampshire e2b685b73e added info logging 2023-10-23 23:21:28 +02:00
mfahampshire 69b87e8d7c added debug info 2023-10-23 19:09:25 +02:00
mfahampshire 688343ed8b * cleaned up unused imports
* commenting
2023-09-14 21:34:57 +02:00
mfahampshire 5ed8e49cf3 continued 2023-09-14 21:31:22 +02:00
mfahampshire 2a4fe9cc9a changed println! -> info! 2023-09-14 21:31:07 +02:00
mfahampshire cd3dc33707 * using sandbox for identify example
* edited connection.rs to print instead of panic
2023-09-14 20:57:59 +02:00
mfahampshire 466d01eabc temp 2023-09-14 14:26:26 +02:00
mfahampshire 85d2567f97 again increase timeout 2023-09-13 13:26:54 +02:00
mfahampshire 7cbbd352b9 * MyBehaviour networkbehaviour object added
* debug printing
* hit connection.rs panic again: investigating
2023-09-13 12:15:49 +02:00
mfahampshire 56c830cdbd added readme instructions 2023-09-12 17:13:36 +02:00
mfahampshire 46ad61fa9e upped handshake timeout from 5 to 10 secs 2023-09-12 17:00:48 +02:00
mfahampshire 17b22a50fe intermittently working... trying to work out why 2023-09-12 17:00:23 +02:00
6 changed files with 263 additions and 9 deletions
+1 -1
View File
@@ -47,7 +47,7 @@ tokio = { workspace = true, features = ["full"] }
nym-bin-common = { path = "../../../common/bin-common" }
# extra dependencies for libp2p examples
libp2p = { git = "https://github.com/ChainSafe/rust-libp2p.git", rev = "e3440d25681df380c9f0f8cfdcfd5ecc0a4f2fb6", features = [ "identify", "macros", "ping", "tokio", "tcp", "dns", "websocket", "noise", "mplex", "yamux", "gossipsub" ]}
libp2p = { git = "https://github.com/ChainSafe/rust-libp2p.git", rev = "e3440d25681df380c9f0f8cfdcfd5ecc0a4f2fb6", features = [ "identify", "macros", "ping", "tokio", "tcp", "dns", "websocket", "noise", "mplex", "yamux", "gossipsub"]}
tokio-stream = "0.1.12"
tokio-util = { version = "0.7", features = ["codec"] }
parking_lot = "0.12"
@@ -0,0 +1,59 @@
# rust-libp2p-nym
This repo contains an example implementation of a libp2p transport using the Nym mixnet. It relies on the ChainSafe's fork of libp2p: https://github.com/ChainSafe/rust-libp2p
## Requirements
- Rust 1.68.2
- `Protoc` protobuf compiler. On Debian/Ubuntu distributed via `apt` as `protobuf-compiler` & on Arch/Manjaro via AUR as `[python-protobuf-compiler](https://aur.archlinux.org/packages/python-protobuf-compiler)`.
## Usage
To instantiate a libp2p swarm using the transport:
```rust
use libp2p::core::{muxing::StreamMuxerBox, transport::Transport};
use libp2p::swarm::{keep_alive::Behaviour, SwarmBuilder};
use libp2p::{identity, PeerId};
use nym_sdk::mixnet::MixnetClient;
use rust_libp2p_nym::transport::NymTransport;
use rust_libp2p_nym::test_utils::create_nym_client;
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
info!("Local peer id: {local_peer_id:?}");
let nym_id = rand::random::<u64>().to_string();
let nym_client = MixnetClient::connect_new().await.unwrap();
let transport = NymTransport::new(nym_client, local_key.clone()).await?;
let _swarm = SwarmBuilder::with_tokio_executor(
transport
.map(|a, _| (a.0, StreamMuxerBox::new(a.1)))
.boxed(),
Behaviour::default(),
local_peer_id,
)
.build();
Ok(())
}
```
## Identify example
To run the example, run the following in one terminal:
```bash
cargo run --example libp2p_identify
# Local peer id: PeerId("12D3KooWLukBu6q2FerWPFhFFhiYaJkhn2sBmceh9UCaXe6hJf5D")
# Listening on "/nym/FhtkzizQg2JbZ19kGkRKXdjV2QnFbT5ww88ZAKaD4nkF.7Remi4UVYzn1yL3qYtEcQBGh6tzTYxMdYB4uqyHVc5Z4@62F81C9GrHDRja9WCqozemRFSzFPMecY85MbGwn6efve"
```
In another terminal, run ping again, passing the Nym multiaddress printed previously:
```bash
cargo run --example libp2p_identify -- /nym/FhtkzizQg2JbZ19kGkRKXdjV2QnFbT5ww88ZAKaD4nkF.7Remi4UVYzn1yL3qYtEcQBGh6tzTYxMdYB4uqyHVc5Z4@62F81C9GrHDRja9WCqozemRFSzFPMecY85MbGwn6efve
# Local peer id: PeerId("12D3KooWNsuRwG6DHnFJCDR8B3zdvja6xLcfnbtKCsQWJ8eppyWC")
# Dialed /nym/FhtkzizQg2JbZ19kGkRKXdjV2QnFbT5ww88ZAKaD4nkF.7Remi4UVYzn1yL3qYtEcQBGh6tzTYxMdYB4uqyHVc5Z4@62F81C9GrHDRja9WCqozemRFSzFPMecY85MbGwn6efve
# Listening on "/nym/2oiRW5C9ivyF3Bo3Gpm4H9EqSKH7A6GpcrRRwVSDVUQ9.EajgCnhzimsP6KskUwKcEj8VFCmHR78s2J6FHWcZ4etR@Fo4f4SQLdoyoGkFae5TpVhRVoXCF8UiypLVGtGjujVPf"
```
@@ -0,0 +1,187 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! identify example
//!
//! In the first terminal window, run:
//!
//! ```sh
//! cargo run
//! ```
//! It will print the [`PeerId`] and the listening addresses, e.g. `Listening on
//! "/nym/<NYM_ADDRESS>"`
//!
//! In the second terminal window, start a new instance of the example with:
//!
//! ```sh
//! cargo run -- /nym/<NYM_ADDRESS>
//! ```
//! The two nodes establish a connection, negotiate the identify protocol
//! and will send each other identify info which is then printed to the console.
use crate::rust_libp2p_nym::transport::NymTransport;
use futures::prelude::*;
use libp2p::swarm::{keep_alive, NetworkBehaviour};
use libp2p::Multiaddr;
use libp2p::{identify, identity, swarm::SwarmEvent, PeerId};
use log::{debug, LevelFilter, info};
use nym_sdk::mixnet::{MixnetClientBuilder, NymNetworkDetails};
use std::error::Error;
use nym_network_defaults::setup_env;
use crate::rust_libp2p_nym::connection::Connection;
#[path = "../libp2p_shared/lib.rs"]
mod rust_libp2p_nym;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::formatted_timed_builder()
.filter_level(log::LevelFilter::Warn)
.filter(Some("libp2p_identify"), LevelFilter::Info)
.init();
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
println!("Local peer id: {local_peer_id:?}");
// setup a mixnet client using the sandbox testnet instead of mainnet (reliability check)
setup_env(Some("../../../envs/sandbox.env"));
let sandbox_network = NymNetworkDetails::new_from_env();
let _mnemonic = String::from("load from file IRL obviously");
let mixnet_client = MixnetClientBuilder::new_ephemeral()
.network_details(sandbox_network)
.build()
.await?;
let client = mixnet_client.connect_to_mixnet().await?;
let transport = NymTransport::new(client, local_key.clone()).await?;
let listen_addr = transport.listen_addr.clone();
let mut swarm = {
debug!("Running `identify` example using NymTransport");
use libp2p::core::{muxing::StreamMuxerBox, transport::Transport};
use libp2p::swarm::SwarmBuilder;
SwarmBuilder::with_tokio_executor(
transport
.map(|a, _| (a.0, StreamMuxerBox::new(a.1)))
.boxed(),
MyBehaviour {
identify: identify::Behaviour::new(identify::Config::new(
"/demo/".to_string(),
local_key.public(),
)),
keep_alive: keep_alive::Behaviour,
},
local_peer_id,
)
.build()
};
let _ = swarm.listen_on(listen_addr.clone())?;
// Dial the peer identified by the multi-address given as the second
// command-line argument, if any.
if let Some(addr) = std::env::args().nth(1) {
let remote: Multiaddr = addr.parse()?;
swarm.dial(remote)?;
println!("Dialed {addr}")
}
loop {
match swarm.select_next_some().await {
SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {address:?}"),
// Prints peer id identify info is being sent to.
SwarmEvent::Behaviour(MyBehaviourEvent::Identify(identify::Event::Sent {
peer_id,
..
})) => {
info!("Sent self identify info to {peer_id:?}");
}
SwarmEvent::Behaviour(MyBehaviourEvent::Identify(identify::Event::Pushed {
peer_id,
..
})) => {
info!("Pushed identify info to {peer_id:?}")
}
// Prints out the info received via the identify event
SwarmEvent::Behaviour(MyBehaviourEvent::Identify(identify::Event::Received {
info,
..
})) => {
info!("Received {info:?}")
}
SwarmEvent::Behaviour(MyBehaviourEvent::Identify(identify::Event::Error {
peer_id,
error
})) => {
info!("Identify Error: {peer_id:?} {error:?}")
}
SwarmEvent::Dialing(peer_id) => {
info!("Dial attempt from {:?}", peer_id)
}
SwarmEvent::ConnectionClosed { peer_id, cause, .. } => {
info!(
"Connection closed with peer: {:?} because: {:?}",
peer_id, cause
)
}
SwarmEvent::IncomingConnection {
local_addr,
send_back_addr,
} => {
info!("Incoming connection from: {:?}, with sendback address: {:?}", local_addr, send_back_addr)
}
SwarmEvent::IncomingConnectionError { error, .. } => {
info!("Incoming connection error: {error:?}")
}
SwarmEvent::ConnectionEstablished {
peer_id,
num_established,
concurrent_dial_errors,
endpoint,
..
} => {
info!("Established connection with {peer_id:?} @ {endpoint:?} with {concurrent_dial_errors:?} errors and {num_established:?} connections")
}
SwarmEvent::ExpiredListenAddr {
listener_id,
address,
} => {
info!("Expired listener {listener_id:?} {address:?}")
}
SwarmEvent::ListenerError { listener_id, error } => {
info!("{listener_id:?} stopped listening with {error:?}")
}
other => {
info!("Unhandled incoming: {other:?}")
}
}
}
}
/// Our network behaviour.
///
/// For illustrative purposes, this includes the [`KeepAlive`](behaviour::KeepAlive) behaviour so a continuous sequence of
/// pings can be observed.
#[derive(NetworkBehaviour)]
struct MyBehaviour {
identify: identify::Behaviour,
keep_alive: keep_alive::Behaviour,
}
@@ -1,5 +1,5 @@
use libp2p::core::{muxing::StreamMuxerEvent, PeerId, StreamMuxer};
use log::debug;
use log::{debug, info};
use nym_sphinx::addressing::clients::Recipient;
use std::{
collections::{HashMap, HashSet},
@@ -95,6 +95,7 @@ impl Connection {
fn new_outbound_substream(&mut self) -> Result<Substream, Error> {
let substream_id = SubstreamId::generate();
let nonce = self.message_nonce.fetch_add(1, Ordering::SeqCst);
info!("sending OPENREQUEST with id {:#?}", substream_id.clone());
// send the substream open request that requests to open a substream with the given ID
self.mixnet_outbound_tx
@@ -154,7 +155,11 @@ impl Connection {
// notify substream that it's closed
let close_tx = self.substream_close_txs.remove(&substream_id);
close_tx.unwrap().send(()).unwrap();
match close_tx {
Some(error) => { info!("Substream ID {:#?} already dropped, cannot notify (SHOULD PANIC): {:#?}", substream_id.clone(), error) }
other => { info!("{other:?}") }
}
// close_tx.unwrap().send(()).unwrap();
// notify poll_close that the substream is closed
self.close_tx
@@ -200,6 +205,7 @@ impl StreamMuxer for Connection {
while let Poll::Ready(Some(msg)) = self.inbound_rx.poll_recv(cx) {
match msg.message_type {
SubstreamMessageType::OpenRequest => {
info!("recieved OPENREQUEST with ID {:#?}", msg.substream_id);
// create a new substream with the given ID
let substream = self.new_substream(msg.substream_id.clone())?;
let nonce = self.message_nonce.fetch_add(1, Ordering::SeqCst);
@@ -218,24 +224,25 @@ impl StreamMuxer for Connection {
}),
})
.map_err(|e| Error::OutboundSendError(e.to_string()))?;
debug!("wrote OpenResponse for substream: {:?}", &msg.substream_id);
info!("wrote OPENRESPONSE for substream: {:?}", &msg.substream_id); // was debug
// send the substream to our own channel to be returned in poll_inbound
self.inbound_open_tx
.send(substream)
.map_err(|e| Error::InboundSendError(e.to_string()))?;
debug!("new inbound substream: {:?}", &msg.substream_id);
info!("new inbound substream: {:?}", &msg.substream_id); // was debug
}
SubstreamMessageType::OpenResponse => {
if !self.pending_substreams.remove(&msg.substream_id) {
debug!(
info!( // was debug
"SubstreamMessageType::OpenResponse no substream pending for ID: {:?}",
&msg.substream_id
);
}
}
SubstreamMessageType::Close => {
info!("received SubstreamMessageType ******CLOSE****** for substream {:#?}", msg.substream_id.clone());
self.handle_close(msg.substream_id)?;
}
SubstreamMessageType::Data(data) => {
@@ -6,5 +6,5 @@ pub(crate) mod queue;
pub mod substream;
pub mod transport;
/// The deafult timeout secs for [`transport::Upgrade`] future.
const DEFAULT_HANDSHAKE_TIMEOUT_SECS: u64 = 5;
/// The default timeout secs for [`transport::Upgrade`] future.
const DEFAULT_HANDSHAKE_TIMEOUT_SECS: u64 = 20;
@@ -207,7 +207,8 @@ impl AsyncWrite for Substream {
message: Message::TransportMessage(TransportMessage {
nonce,
id: self.connection_id.clone(),
message: SubstreamMessage::new_close(self.substream_id.clone()),
message: SubstreamMessage::
new_close(self.substream_id.clone()),
}),
})
.map_err(|e| {