Compare commits
12 Commits
gear
...
libp2p-identify
| Author | SHA1 | Date | |
|---|---|---|---|
| e2b685b73e | |||
| 69b87e8d7c | |||
| 688343ed8b | |||
| 5ed8e49cf3 | |||
| 2a4fe9cc9a | |||
| cd3dc33707 | |||
| 466d01eabc | |||
| 85d2567f97 | |||
| 7cbbd352b9 | |||
| 56c830cdbd | |||
| 46ad61fa9e | |||
| 17b22a50fe |
@@ -47,7 +47,7 @@ tokio = { workspace = true, features = ["full"] }
|
||||
nym-bin-common = { path = "../../../common/bin-common" }
|
||||
|
||||
# extra dependencies for libp2p examples
|
||||
libp2p = { git = "https://github.com/ChainSafe/rust-libp2p.git", rev = "e3440d25681df380c9f0f8cfdcfd5ecc0a4f2fb6", features = [ "identify", "macros", "ping", "tokio", "tcp", "dns", "websocket", "noise", "mplex", "yamux", "gossipsub" ]}
|
||||
libp2p = { git = "https://github.com/ChainSafe/rust-libp2p.git", rev = "e3440d25681df380c9f0f8cfdcfd5ecc0a4f2fb6", features = [ "identify", "macros", "ping", "tokio", "tcp", "dns", "websocket", "noise", "mplex", "yamux", "gossipsub"]}
|
||||
tokio-stream = "0.1.12"
|
||||
tokio-util = { version = "0.7", features = ["codec"] }
|
||||
parking_lot = "0.12"
|
||||
|
||||
@@ -0,0 +1,59 @@
|
||||
# rust-libp2p-nym
|
||||
|
||||
This repo contains an example implementation of a libp2p transport using the Nym mixnet. It relies on the ChainSafe's fork of libp2p: https://github.com/ChainSafe/rust-libp2p
|
||||
|
||||
## Requirements
|
||||
|
||||
- Rust 1.68.2
|
||||
- `Protoc` protobuf compiler. On Debian/Ubuntu distributed via `apt` as `protobuf-compiler` & on Arch/Manjaro via AUR as `[python-protobuf-compiler](https://aur.archlinux.org/packages/python-protobuf-compiler)`.
|
||||
|
||||
## Usage
|
||||
|
||||
To instantiate a libp2p swarm using the transport:
|
||||
|
||||
```rust
|
||||
use libp2p::core::{muxing::StreamMuxerBox, transport::Transport};
|
||||
use libp2p::swarm::{keep_alive::Behaviour, SwarmBuilder};
|
||||
use libp2p::{identity, PeerId};
|
||||
use nym_sdk::mixnet::MixnetClient;
|
||||
use rust_libp2p_nym::transport::NymTransport;
|
||||
use rust_libp2p_nym::test_utils::create_nym_client;
|
||||
use std::error::Error;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn Error>> {
|
||||
let local_key = identity::Keypair::generate_ed25519();
|
||||
let local_peer_id = PeerId::from(local_key.public());
|
||||
info!("Local peer id: {local_peer_id:?}");
|
||||
|
||||
let nym_id = rand::random::<u64>().to_string();
|
||||
let nym_client = MixnetClient::connect_new().await.unwrap();
|
||||
let transport = NymTransport::new(nym_client, local_key.clone()).await?;
|
||||
let _swarm = SwarmBuilder::with_tokio_executor(
|
||||
transport
|
||||
.map(|a, _| (a.0, StreamMuxerBox::new(a.1)))
|
||||
.boxed(),
|
||||
Behaviour::default(),
|
||||
local_peer_id,
|
||||
)
|
||||
.build();
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
## Identify example
|
||||
|
||||
To run the example, run the following in one terminal:
|
||||
```bash
|
||||
cargo run --example libp2p_identify
|
||||
# Local peer id: PeerId("12D3KooWLukBu6q2FerWPFhFFhiYaJkhn2sBmceh9UCaXe6hJf5D")
|
||||
# Listening on "/nym/FhtkzizQg2JbZ19kGkRKXdjV2QnFbT5ww88ZAKaD4nkF.7Remi4UVYzn1yL3qYtEcQBGh6tzTYxMdYB4uqyHVc5Z4@62F81C9GrHDRja9WCqozemRFSzFPMecY85MbGwn6efve"
|
||||
```
|
||||
|
||||
In another terminal, run ping again, passing the Nym multiaddress printed previously:
|
||||
```bash
|
||||
cargo run --example libp2p_identify -- /nym/FhtkzizQg2JbZ19kGkRKXdjV2QnFbT5ww88ZAKaD4nkF.7Remi4UVYzn1yL3qYtEcQBGh6tzTYxMdYB4uqyHVc5Z4@62F81C9GrHDRja9WCqozemRFSzFPMecY85MbGwn6efve
|
||||
# Local peer id: PeerId("12D3KooWNsuRwG6DHnFJCDR8B3zdvja6xLcfnbtKCsQWJ8eppyWC")
|
||||
# Dialed /nym/FhtkzizQg2JbZ19kGkRKXdjV2QnFbT5ww88ZAKaD4nkF.7Remi4UVYzn1yL3qYtEcQBGh6tzTYxMdYB4uqyHVc5Z4@62F81C9GrHDRja9WCqozemRFSzFPMecY85MbGwn6efve
|
||||
# Listening on "/nym/2oiRW5C9ivyF3Bo3Gpm4H9EqSKH7A6GpcrRRwVSDVUQ9.EajgCnhzimsP6KskUwKcEj8VFCmHR78s2J6FHWcZ4etR@Fo4f4SQLdoyoGkFae5TpVhRVoXCF8UiypLVGtGjujVPf"
|
||||
```
|
||||
@@ -0,0 +1,187 @@
|
||||
// Copyright 2018 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
//! identify example
|
||||
//!
|
||||
//! In the first terminal window, run:
|
||||
//!
|
||||
//! ```sh
|
||||
//! cargo run
|
||||
//! ```
|
||||
//! It will print the [`PeerId`] and the listening addresses, e.g. `Listening on
|
||||
//! "/nym/<NYM_ADDRESS>"`
|
||||
//!
|
||||
//! In the second terminal window, start a new instance of the example with:
|
||||
//!
|
||||
//! ```sh
|
||||
//! cargo run -- /nym/<NYM_ADDRESS>
|
||||
//! ```
|
||||
//! The two nodes establish a connection, negotiate the identify protocol
|
||||
//! and will send each other identify info which is then printed to the console.
|
||||
|
||||
use crate::rust_libp2p_nym::transport::NymTransport;
|
||||
use futures::prelude::*;
|
||||
use libp2p::swarm::{keep_alive, NetworkBehaviour};
|
||||
use libp2p::Multiaddr;
|
||||
use libp2p::{identify, identity, swarm::SwarmEvent, PeerId};
|
||||
use log::{debug, LevelFilter, info};
|
||||
use nym_sdk::mixnet::{MixnetClientBuilder, NymNetworkDetails};
|
||||
use std::error::Error;
|
||||
use nym_network_defaults::setup_env;
|
||||
use crate::rust_libp2p_nym::connection::Connection;
|
||||
|
||||
#[path = "../libp2p_shared/lib.rs"]
|
||||
mod rust_libp2p_nym;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn Error>> {
|
||||
pretty_env_logger::formatted_timed_builder()
|
||||
.filter_level(log::LevelFilter::Warn)
|
||||
.filter(Some("libp2p_identify"), LevelFilter::Info)
|
||||
.init();
|
||||
|
||||
let local_key = identity::Keypair::generate_ed25519();
|
||||
let local_peer_id = PeerId::from(local_key.public());
|
||||
println!("Local peer id: {local_peer_id:?}");
|
||||
|
||||
// setup a mixnet client using the sandbox testnet instead of mainnet (reliability check)
|
||||
setup_env(Some("../../../envs/sandbox.env"));
|
||||
let sandbox_network = NymNetworkDetails::new_from_env();
|
||||
let _mnemonic = String::from("load from file IRL obviously");
|
||||
let mixnet_client = MixnetClientBuilder::new_ephemeral()
|
||||
.network_details(sandbox_network)
|
||||
.build()
|
||||
.await?;
|
||||
let client = mixnet_client.connect_to_mixnet().await?;
|
||||
let transport = NymTransport::new(client, local_key.clone()).await?;
|
||||
let listen_addr = transport.listen_addr.clone();
|
||||
|
||||
let mut swarm = {
|
||||
debug!("Running `identify` example using NymTransport");
|
||||
use libp2p::core::{muxing::StreamMuxerBox, transport::Transport};
|
||||
use libp2p::swarm::SwarmBuilder;
|
||||
|
||||
SwarmBuilder::with_tokio_executor(
|
||||
transport
|
||||
.map(|a, _| (a.0, StreamMuxerBox::new(a.1)))
|
||||
.boxed(),
|
||||
MyBehaviour {
|
||||
identify: identify::Behaviour::new(identify::Config::new(
|
||||
"/demo/".to_string(),
|
||||
local_key.public(),
|
||||
)),
|
||||
keep_alive: keep_alive::Behaviour,
|
||||
},
|
||||
local_peer_id,
|
||||
)
|
||||
.build()
|
||||
};
|
||||
|
||||
let _ = swarm.listen_on(listen_addr.clone())?;
|
||||
|
||||
// Dial the peer identified by the multi-address given as the second
|
||||
// command-line argument, if any.
|
||||
if let Some(addr) = std::env::args().nth(1) {
|
||||
let remote: Multiaddr = addr.parse()?;
|
||||
swarm.dial(remote)?;
|
||||
println!("Dialed {addr}")
|
||||
}
|
||||
|
||||
loop {
|
||||
match swarm.select_next_some().await {
|
||||
SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {address:?}"),
|
||||
// Prints peer id identify info is being sent to.
|
||||
SwarmEvent::Behaviour(MyBehaviourEvent::Identify(identify::Event::Sent {
|
||||
peer_id,
|
||||
..
|
||||
})) => {
|
||||
info!("Sent self identify info to {peer_id:?}");
|
||||
}
|
||||
SwarmEvent::Behaviour(MyBehaviourEvent::Identify(identify::Event::Pushed {
|
||||
peer_id,
|
||||
..
|
||||
})) => {
|
||||
info!("Pushed identify info to {peer_id:?}")
|
||||
}
|
||||
// Prints out the info received via the identify event
|
||||
SwarmEvent::Behaviour(MyBehaviourEvent::Identify(identify::Event::Received {
|
||||
info,
|
||||
..
|
||||
})) => {
|
||||
info!("Received {info:?}")
|
||||
}
|
||||
SwarmEvent::Behaviour(MyBehaviourEvent::Identify(identify::Event::Error {
|
||||
peer_id,
|
||||
error
|
||||
})) => {
|
||||
info!("Identify Error: {peer_id:?} {error:?}")
|
||||
}
|
||||
SwarmEvent::Dialing(peer_id) => {
|
||||
info!("Dial attempt from {:?}", peer_id)
|
||||
}
|
||||
SwarmEvent::ConnectionClosed { peer_id, cause, .. } => {
|
||||
info!(
|
||||
"Connection closed with peer: {:?} because: {:?}",
|
||||
peer_id, cause
|
||||
)
|
||||
}
|
||||
SwarmEvent::IncomingConnection {
|
||||
local_addr,
|
||||
send_back_addr,
|
||||
} => {
|
||||
info!("Incoming connection from: {:?}, with sendback address: {:?}", local_addr, send_back_addr)
|
||||
}
|
||||
SwarmEvent::IncomingConnectionError { error, .. } => {
|
||||
info!("Incoming connection error: {error:?}")
|
||||
}
|
||||
SwarmEvent::ConnectionEstablished {
|
||||
peer_id,
|
||||
num_established,
|
||||
concurrent_dial_errors,
|
||||
endpoint,
|
||||
..
|
||||
} => {
|
||||
info!("Established connection with {peer_id:?} @ {endpoint:?} with {concurrent_dial_errors:?} errors and {num_established:?} connections")
|
||||
}
|
||||
SwarmEvent::ExpiredListenAddr {
|
||||
listener_id,
|
||||
address,
|
||||
} => {
|
||||
info!("Expired listener {listener_id:?} {address:?}")
|
||||
}
|
||||
SwarmEvent::ListenerError { listener_id, error } => {
|
||||
info!("{listener_id:?} stopped listening with {error:?}")
|
||||
}
|
||||
other => {
|
||||
info!("Unhandled incoming: {other:?}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Our network behaviour.
|
||||
///
|
||||
/// For illustrative purposes, this includes the [`KeepAlive`](behaviour::KeepAlive) behaviour so a continuous sequence of
|
||||
/// pings can be observed.
|
||||
#[derive(NetworkBehaviour)]
|
||||
struct MyBehaviour {
|
||||
identify: identify::Behaviour,
|
||||
keep_alive: keep_alive::Behaviour,
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
use libp2p::core::{muxing::StreamMuxerEvent, PeerId, StreamMuxer};
|
||||
use log::debug;
|
||||
use log::{debug, info};
|
||||
use nym_sphinx::addressing::clients::Recipient;
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
@@ -95,6 +95,7 @@ impl Connection {
|
||||
fn new_outbound_substream(&mut self) -> Result<Substream, Error> {
|
||||
let substream_id = SubstreamId::generate();
|
||||
let nonce = self.message_nonce.fetch_add(1, Ordering::SeqCst);
|
||||
info!("sending OPENREQUEST with id {:#?}", substream_id.clone());
|
||||
|
||||
// send the substream open request that requests to open a substream with the given ID
|
||||
self.mixnet_outbound_tx
|
||||
@@ -154,7 +155,11 @@ impl Connection {
|
||||
|
||||
// notify substream that it's closed
|
||||
let close_tx = self.substream_close_txs.remove(&substream_id);
|
||||
close_tx.unwrap().send(()).unwrap();
|
||||
match close_tx {
|
||||
Some(error) => { info!("Substream ID {:#?} already dropped, cannot notify (SHOULD PANIC): {:#?}", substream_id.clone(), error) }
|
||||
other => { info!("{other:?}") }
|
||||
}
|
||||
// close_tx.unwrap().send(()).unwrap();
|
||||
|
||||
// notify poll_close that the substream is closed
|
||||
self.close_tx
|
||||
@@ -200,6 +205,7 @@ impl StreamMuxer for Connection {
|
||||
while let Poll::Ready(Some(msg)) = self.inbound_rx.poll_recv(cx) {
|
||||
match msg.message_type {
|
||||
SubstreamMessageType::OpenRequest => {
|
||||
info!("recieved OPENREQUEST with ID {:#?}", msg.substream_id);
|
||||
// create a new substream with the given ID
|
||||
let substream = self.new_substream(msg.substream_id.clone())?;
|
||||
let nonce = self.message_nonce.fetch_add(1, Ordering::SeqCst);
|
||||
@@ -218,24 +224,25 @@ impl StreamMuxer for Connection {
|
||||
}),
|
||||
})
|
||||
.map_err(|e| Error::OutboundSendError(e.to_string()))?;
|
||||
debug!("wrote OpenResponse for substream: {:?}", &msg.substream_id);
|
||||
info!("wrote OPENRESPONSE for substream: {:?}", &msg.substream_id); // was debug
|
||||
|
||||
// send the substream to our own channel to be returned in poll_inbound
|
||||
self.inbound_open_tx
|
||||
.send(substream)
|
||||
.map_err(|e| Error::InboundSendError(e.to_string()))?;
|
||||
|
||||
debug!("new inbound substream: {:?}", &msg.substream_id);
|
||||
info!("new inbound substream: {:?}", &msg.substream_id); // was debug
|
||||
}
|
||||
SubstreamMessageType::OpenResponse => {
|
||||
if !self.pending_substreams.remove(&msg.substream_id) {
|
||||
debug!(
|
||||
info!( // was debug
|
||||
"SubstreamMessageType::OpenResponse no substream pending for ID: {:?}",
|
||||
&msg.substream_id
|
||||
);
|
||||
}
|
||||
}
|
||||
SubstreamMessageType::Close => {
|
||||
info!("received SubstreamMessageType ******CLOSE****** for substream {:#?}", msg.substream_id.clone());
|
||||
self.handle_close(msg.substream_id)?;
|
||||
}
|
||||
SubstreamMessageType::Data(data) => {
|
||||
|
||||
@@ -6,5 +6,5 @@ pub(crate) mod queue;
|
||||
pub mod substream;
|
||||
pub mod transport;
|
||||
|
||||
/// The deafult timeout secs for [`transport::Upgrade`] future.
|
||||
const DEFAULT_HANDSHAKE_TIMEOUT_SECS: u64 = 5;
|
||||
/// The default timeout secs for [`transport::Upgrade`] future.
|
||||
const DEFAULT_HANDSHAKE_TIMEOUT_SECS: u64 = 20;
|
||||
|
||||
@@ -207,7 +207,8 @@ impl AsyncWrite for Substream {
|
||||
message: Message::TransportMessage(TransportMessage {
|
||||
nonce,
|
||||
id: self.connection_id.clone(),
|
||||
message: SubstreamMessage::new_close(self.substream_id.clone()),
|
||||
message: SubstreamMessage::
|
||||
new_close(self.substream_id.clone()),
|
||||
}),
|
||||
})
|
||||
.map_err(|e| {
|
||||
|
||||
Reference in New Issue
Block a user