Compare commits
32 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 4cc63bac1c | |||
| 6519bfa533 | |||
| dc9823334a | |||
| cec05a99f4 | |||
| d487f4d98c | |||
| b9e9809938 | |||
| 9b50188d7d | |||
| 0e3dbece8b | |||
| 052f7649a8 | |||
| 3fde9e648f | |||
| 0b37b9fb1c | |||
| e273bfc25e | |||
| d2ef94f1bd | |||
| 92ab794294 | |||
| 3f0210d56a | |||
| 9b53473bee | |||
| 5fdae14cb9 | |||
| 2f4fad3ce3 | |||
| cc604c5f18 | |||
| d0aece501f | |||
| 22b5670396 | |||
| 79e9399dfe | |||
| 8450df28df | |||
| 0b23d1624f | |||
| 2026ffd61f | |||
| 48e5aecda1 | |||
| d8e484b77e | |||
| d4ca2a7220 | |||
| 2f0074821c | |||
| d5e332ad39 | |||
| 14bf5645b1 | |||
| a11582749c |
@@ -13,6 +13,8 @@ on:
|
||||
required: true
|
||||
default: false
|
||||
type: boolean
|
||||
schedule:
|
||||
- cron: '14 0 * * *'
|
||||
pull_request:
|
||||
paths:
|
||||
- "clients/**"
|
||||
@@ -62,7 +64,9 @@ jobs:
|
||||
- name: Set CARGO_FEATURES
|
||||
run: |
|
||||
echo 'CARGO_FEATURES=--features wireguard' >> $GITHUB_ENV
|
||||
if: github.event_name == 'workflow_dispatch' && inputs.enable_wireguard == true
|
||||
if: >
|
||||
github.event_name == 'schedule' ||
|
||||
(github.event_name == 'workflow_dispatch' && inputs.enable_wireguard == true)
|
||||
|
||||
- name: Install Rust stable
|
||||
uses: actions-rs/toolchain@v1
|
||||
@@ -103,10 +107,10 @@ jobs:
|
||||
target/release/nym-cli
|
||||
retention-days: 30
|
||||
|
||||
# If this was a pull_request, upload to build server
|
||||
# If this was a pull_request or nightly, upload to build server
|
||||
|
||||
- name: Prepare build output
|
||||
if: github.event_name == 'pull_request'
|
||||
# if: github.event_name == 'schedule' || github.event_name == 'pull_request'
|
||||
shell: bash
|
||||
env:
|
||||
OUTPUT_DIR: ci-builds/${{ github.ref_name }}
|
||||
@@ -123,7 +127,7 @@ jobs:
|
||||
cp target/debian/*.deb $OUTPUT_DIR
|
||||
|
||||
- name: Deploy branch to CI www
|
||||
if: github.event_name == 'pull_request'
|
||||
# if: github.event_name == 'schedule' || github.event_name == 'pull_request'
|
||||
continue-on-error: true
|
||||
uses: easingthemes/ssh-deploy@main
|
||||
env:
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
name: Publish Nym binaries
|
||||
name: publish-nym-binaries
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
name: Publish Nym Connect - desktop (MacOS)
|
||||
name: publish-nym-connect-macos
|
||||
on:
|
||||
workflow_dispatch:
|
||||
release:
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
name: Publish Nym Connect - desktop (Ubuntu)
|
||||
name: publish-nym-connect-ubuntu
|
||||
on:
|
||||
workflow_dispatch:
|
||||
release:
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
name: Publish Nym Connect - desktop (Windows 10)
|
||||
name: publish-nym-connect-win10
|
||||
on:
|
||||
workflow_dispatch:
|
||||
release:
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
name: Build release of Nym smart contracts
|
||||
name: publish-nym-contracts
|
||||
on:
|
||||
workflow_dispatch:
|
||||
release:
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
name: Publish Nym Wallet (MacOS)
|
||||
name: publish-nym-wallet-macos
|
||||
on:
|
||||
workflow_dispatch:
|
||||
release:
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
name: Publish Nym Wallet (Ubuntu)
|
||||
name: publish-nym-wallet-ubuntu
|
||||
on:
|
||||
workflow_dispatch:
|
||||
release:
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
name: Publish Nym Wallet (Windows 10)
|
||||
name: publish-nym-wallet-win10
|
||||
on:
|
||||
workflow_dispatch:
|
||||
release:
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
name: Publish Typescript SDK
|
||||
name: publish-sdk-npm
|
||||
on:
|
||||
workflow_dispatch:
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
name: Releases - calculate file hashes
|
||||
name: release-calculate-hash
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
|
||||
Generated
+2
@@ -6732,6 +6732,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"bincode",
|
||||
"bytes",
|
||||
"nym-bin-common",
|
||||
"nym-sphinx",
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
@@ -6776,6 +6777,7 @@ dependencies = [
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-tun",
|
||||
"tokio-util",
|
||||
"url",
|
||||
]
|
||||
|
||||
|
||||
@@ -52,6 +52,7 @@ use nym_topology::provider_trait::TopologyProvider;
|
||||
use nym_topology::HardcodedTopologyProvider;
|
||||
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
|
||||
use std::fmt::Debug;
|
||||
use std::os::fd::RawFd;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use url::Url;
|
||||
@@ -683,6 +684,7 @@ where
|
||||
packet_stats_reporter.clone(),
|
||||
);
|
||||
|
||||
let gateway_fd = gateway_transceiver.ws_fd();
|
||||
// The message_sender is the transmitter for any component generating sphinx packets
|
||||
// that are to be sent to the mixnet. They are used by cover traffic stream and real
|
||||
// traffic stream.
|
||||
@@ -755,6 +757,7 @@ where
|
||||
received_buffer_request_sender,
|
||||
},
|
||||
},
|
||||
gateway_fd,
|
||||
client_state: ClientState {
|
||||
shared_lane_queue_lengths,
|
||||
reply_controller_sender,
|
||||
@@ -770,6 +773,7 @@ pub struct BaseClient {
|
||||
pub client_input: ClientInputStatus,
|
||||
pub client_output: ClientOutputStatus,
|
||||
pub client_state: ClientState,
|
||||
pub gateway_fd: Option<RawFd>,
|
||||
|
||||
pub task_handle: TaskHandle,
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ use nym_gateway_client::GatewayClient;
|
||||
pub use nym_gateway_client::{GatewayPacketRouter, PacketRouter};
|
||||
use nym_sphinx::forwarding::packet::MixPacket;
|
||||
use std::fmt::Debug;
|
||||
use std::os::fd::RawFd;
|
||||
use thiserror::Error;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
@@ -25,6 +26,7 @@ fn erase_err<E: std::error::Error + Send + Sync + 'static>(err: E) -> ErasedGate
|
||||
/// This combines combines the functionalities of being able to send and receive mix packets.
|
||||
pub trait GatewayTransceiver: GatewaySender + GatewayReceiver {
|
||||
fn gateway_identity(&self) -> identity::PublicKey;
|
||||
fn ws_fd(&self) -> Option<RawFd>;
|
||||
}
|
||||
|
||||
/// This trait defines the functionality of sending `MixPacket` into the mixnet,
|
||||
@@ -66,6 +68,9 @@ impl<G: GatewayTransceiver + ?Sized + Send> GatewayTransceiver for Box<G> {
|
||||
fn gateway_identity(&self) -> identity::PublicKey {
|
||||
(**self).gateway_identity()
|
||||
}
|
||||
fn ws_fd(&self) -> Option<RawFd> {
|
||||
(**self).ws_fd()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
|
||||
@@ -112,6 +117,9 @@ where
|
||||
fn gateway_identity(&self) -> identity::PublicKey {
|
||||
self.gateway_client.gateway_identity()
|
||||
}
|
||||
fn ws_fd(&self) -> Option<RawFd> {
|
||||
self.gateway_client.ws_fd()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
|
||||
@@ -187,6 +195,9 @@ mod nonwasm_sealed {
|
||||
fn gateway_identity(&self) -> identity::PublicKey {
|
||||
self.local_identity
|
||||
}
|
||||
fn ws_fd(&self) -> Option<RawFd> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -259,4 +270,7 @@ impl GatewayTransceiver for MockGateway {
|
||||
fn gateway_identity(&self) -> identity::PublicKey {
|
||||
self.dummy_identity
|
||||
}
|
||||
fn ws_fd(&self) -> Option<RawFd> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
@@ -266,11 +266,11 @@ impl std::ops::Div<f64> for PacketRates {
|
||||
impl PacketRates {
|
||||
fn summary(&self) -> String {
|
||||
format!(
|
||||
"rx: {}/s (real: {}/s), tx: {}/s (real: {}/s)",
|
||||
bibytes2(self.real_packets_received_size + self.cover_packets_received_size),
|
||||
"down: {}/s, up: {}/s (cover down: {}/s, cover up: {}/s)",
|
||||
bibytes2(self.real_packets_received_size),
|
||||
bibytes2(self.real_packets_sent_size + self.cover_packets_sent_size),
|
||||
bibytes2(self.real_packets_sent_size),
|
||||
bibytes2(self.cover_packets_received_size),
|
||||
bibytes2(self.cover_packets_sent_size),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -288,6 +288,7 @@ impl PacketRates {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum PacketStatisticsEvent {
|
||||
// The real packets sent. Recall that acks are sent by the gateway, so it's not included here.
|
||||
RealPacketSent(usize),
|
||||
@@ -443,7 +444,11 @@ impl PacketStatisticsControl {
|
||||
// Check what the number of retransmissions was during the recording window
|
||||
if let Some((_, start_stats)) = self.history.front() {
|
||||
let delta = self.stats.clone() - start_stats.clone();
|
||||
log::info!("retransmissions: {}", delta.retransmissions_queued,);
|
||||
log::info!(
|
||||
"mix packet retransmissions/real mix packets: {}/{}",
|
||||
delta.retransmissions_queued,
|
||||
delta.real_packets_queued,
|
||||
);
|
||||
} else {
|
||||
log::warn!("Unable to check retransmissions during recording window");
|
||||
}
|
||||
|
||||
@@ -26,6 +26,8 @@ use nym_task::TaskClient;
|
||||
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
|
||||
use rand::rngs::OsRng;
|
||||
use std::convert::TryFrom;
|
||||
use std::os::fd::AsRawFd;
|
||||
use std::os::fd::RawFd;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tungstenite::protocol::Message;
|
||||
@@ -34,6 +36,7 @@ use tungstenite::protocol::Message;
|
||||
use tokio::time::sleep;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio_tungstenite::connect_async;
|
||||
use tokio_tungstenite::MaybeTlsStream;
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
use wasm_utils::websocket::JSWebsocket;
|
||||
@@ -146,6 +149,21 @@ impl<C, St> GatewayClient<C, St> {
|
||||
self.gateway_identity
|
||||
}
|
||||
|
||||
pub fn ws_fd(&self) -> Option<RawFd> {
|
||||
match &self.connection {
|
||||
SocketState::Available(conn) => match conn.get_ref() {
|
||||
MaybeTlsStream::Plain(stream) => Some(stream.as_raw_fd()),
|
||||
MaybeTlsStream::NativeTls(stream) => Some(stream.as_raw_fd()),
|
||||
&_ => None,
|
||||
},
|
||||
SocketState::PartiallyDelegated(conn) => Some(conn.ws_fd()),
|
||||
_ => {
|
||||
log::warn!("No fd yet");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remaining_bandwidth(&self) -> i64 {
|
||||
self.bandwidth_remaining
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ use futures::{SinkExt, StreamExt};
|
||||
use log::*;
|
||||
use nym_gateway_requests::registration::handshake::SharedKeys;
|
||||
use nym_task::TaskClient;
|
||||
use std::os::fd::{AsRawFd, RawFd};
|
||||
use std::sync::Arc;
|
||||
use tungstenite::Message;
|
||||
|
||||
@@ -38,11 +39,15 @@ type WsConn = JSWebsocket;
|
||||
type SplitStreamReceiver = oneshot::Receiver<Result<SplitStream<WsConn>, GatewayClientError>>;
|
||||
|
||||
pub(crate) struct PartiallyDelegated {
|
||||
ws_fd: RawFd,
|
||||
sink_half: SplitSink<WsConn, Message>,
|
||||
delegated_stream: (SplitStreamReceiver, oneshot::Sender<()>),
|
||||
}
|
||||
|
||||
impl PartiallyDelegated {
|
||||
pub fn ws_fd(&self) -> RawFd {
|
||||
self.ws_fd
|
||||
}
|
||||
fn recover_received_plaintexts(ws_msgs: Vec<Message>, shared_key: &SharedKeys) -> Vec<Vec<u8>> {
|
||||
let mut plaintexts = Vec::with_capacity(ws_msgs.len());
|
||||
for ws_msg in ws_msgs {
|
||||
@@ -92,6 +97,11 @@ impl PartiallyDelegated {
|
||||
let (notify_sender, notify_receiver) = oneshot::channel();
|
||||
let (stream_sender, stream_receiver) = oneshot::channel();
|
||||
|
||||
let ws_fd = match conn.get_ref() {
|
||||
MaybeTlsStream::Plain(stream) => stream.as_raw_fd(),
|
||||
MaybeTlsStream::NativeTls(stream) => stream.as_raw_fd(),
|
||||
_ => 0.into(),
|
||||
};
|
||||
let (sink, mut stream) = conn.split();
|
||||
|
||||
let mixnet_receiver_future = async move {
|
||||
@@ -141,6 +151,7 @@ impl PartiallyDelegated {
|
||||
tokio::spawn(mixnet_receiver_future);
|
||||
|
||||
PartiallyDelegated {
|
||||
ws_fd,
|
||||
sink_half: sink,
|
||||
delegated_stream: (stream_receiver, notify_sender),
|
||||
}
|
||||
|
||||
@@ -16,7 +16,6 @@ use crate::signing::tx_signer::TxSigner;
|
||||
use crate::signing::AccountData;
|
||||
use crate::{DirectSigningReqwestRpcNyxdClient, QueryReqwestRpcNyxdClient, ReqwestRpcClient};
|
||||
use async_trait::async_trait;
|
||||
use cosmrs::cosmwasm;
|
||||
use cosmrs::tendermint::{abci, evidence::Evidence, Genesis};
|
||||
use cosmrs::tx::{Raw, SignDoc};
|
||||
use cosmwasm_std::Addr;
|
||||
@@ -40,7 +39,7 @@ pub use crate::rpc::TendermintRpcClient;
|
||||
pub use coin::Coin;
|
||||
pub use cosmrs::{
|
||||
bank::MsgSend,
|
||||
bip32,
|
||||
bip32, cosmwasm,
|
||||
crypto::PublicKey,
|
||||
query::{PageRequest, PageResponse},
|
||||
tendermint::{
|
||||
|
||||
@@ -11,6 +11,7 @@ license.workspace = true
|
||||
[dependencies]
|
||||
bincode = "1.3.3"
|
||||
bytes = "1.5.0"
|
||||
nym-bin-common = { path = "../bin-common" }
|
||||
nym-sphinx = { path = "../nymsphinx" }
|
||||
rand = "0.8.5"
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
|
||||
@@ -2,7 +2,7 @@ pub mod codec;
|
||||
pub mod request;
|
||||
pub mod response;
|
||||
|
||||
pub const CURRENT_VERSION: u8 = 2;
|
||||
pub const CURRENT_VERSION: u8 = 3;
|
||||
|
||||
fn make_bincode_serializer() -> impl bincode::Options {
|
||||
use bincode::Options;
|
||||
|
||||
@@ -23,6 +23,7 @@ impl IpPacketRequest {
|
||||
reply_to: Recipient,
|
||||
reply_to_hops: Option<u8>,
|
||||
reply_to_avg_mix_delays: Option<f64>,
|
||||
buffer_timeout: Option<u64>,
|
||||
) -> (Self, u64) {
|
||||
let request_id = generate_random();
|
||||
(
|
||||
@@ -34,6 +35,7 @@ impl IpPacketRequest {
|
||||
reply_to,
|
||||
reply_to_hops,
|
||||
reply_to_avg_mix_delays,
|
||||
buffer_timeout,
|
||||
}),
|
||||
},
|
||||
request_id,
|
||||
@@ -44,6 +46,7 @@ impl IpPacketRequest {
|
||||
reply_to: Recipient,
|
||||
reply_to_hops: Option<u8>,
|
||||
reply_to_avg_mix_delays: Option<f64>,
|
||||
buffer_timeout: Option<u64>,
|
||||
) -> (Self, u64) {
|
||||
let request_id = generate_random();
|
||||
(
|
||||
@@ -54,6 +57,7 @@ impl IpPacketRequest {
|
||||
reply_to,
|
||||
reply_to_hops,
|
||||
reply_to_avg_mix_delays,
|
||||
buffer_timeout,
|
||||
}),
|
||||
},
|
||||
request_id,
|
||||
@@ -74,10 +78,10 @@ impl IpPacketRequest {
|
||||
)
|
||||
}
|
||||
|
||||
pub fn new_ip_packet(ip_packet: bytes::Bytes) -> Self {
|
||||
pub fn new_data_request(ip_packets: bytes::Bytes) -> Self {
|
||||
Self {
|
||||
version: CURRENT_VERSION,
|
||||
data: IpPacketRequestData::Data(DataRequest { ip_packet }),
|
||||
data: IpPacketRequestData::Data(DataRequest { ip_packets }),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,6 +91,8 @@ impl IpPacketRequest {
|
||||
IpPacketRequestData::DynamicConnect(request) => Some(request.request_id),
|
||||
IpPacketRequestData::Disconnect(request) => Some(request.request_id),
|
||||
IpPacketRequestData::Data(_) => None,
|
||||
IpPacketRequestData::Ping(request) => Some(request.request_id),
|
||||
IpPacketRequestData::Health(request) => Some(request.request_id),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -96,6 +102,8 @@ impl IpPacketRequest {
|
||||
IpPacketRequestData::DynamicConnect(request) => Some(&request.reply_to),
|
||||
IpPacketRequestData::Disconnect(request) => Some(&request.reply_to),
|
||||
IpPacketRequestData::Data(_) => None,
|
||||
IpPacketRequestData::Ping(request) => Some(&request.reply_to),
|
||||
IpPacketRequestData::Health(request) => Some(&request.reply_to),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -119,35 +127,58 @@ pub enum IpPacketRequestData {
|
||||
DynamicConnect(DynamicConnectRequest),
|
||||
Disconnect(DisconnectRequest),
|
||||
Data(DataRequest),
|
||||
Ping(PingRequest),
|
||||
Health(HealthRequest),
|
||||
}
|
||||
|
||||
// A static connect request is when the client provides the internal IP address it will use on the
|
||||
// ip packet router.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||
pub struct StaticConnectRequest {
|
||||
pub request_id: u64,
|
||||
|
||||
pub ip: IpAddr,
|
||||
|
||||
// The nym-address the response should be sent back to
|
||||
pub reply_to: Recipient,
|
||||
|
||||
// The number of mix node hops that responses should take, in addition to the entry and exit
|
||||
// node. Zero means only client -> entry -> exit -> client.
|
||||
pub reply_to_hops: Option<u8>,
|
||||
|
||||
// The average delay at each mix node, in milliseconds. Currently this is not supported by the
|
||||
// ip packet router.
|
||||
pub reply_to_avg_mix_delays: Option<f64>,
|
||||
|
||||
// The maximum time in milliseconds the IPR should wait when filling up a mix packet
|
||||
// with ip packets.
|
||||
pub buffer_timeout: Option<u64>,
|
||||
}
|
||||
|
||||
// A dynamic connect request is when the client does not provide the internal IP address it will use
|
||||
// on the ip packet router, and instead requests one to be assigned to it.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||
pub struct DynamicConnectRequest {
|
||||
pub request_id: u64,
|
||||
|
||||
// The nym-address the response should be sent back to
|
||||
pub reply_to: Recipient,
|
||||
|
||||
// The number of mix node hops that responses should take, in addition to the entry and exit
|
||||
// node. Zero means only client -> entry -> exit -> client.
|
||||
pub reply_to_hops: Option<u8>,
|
||||
|
||||
// The average delay at each mix node, in milliseconds. Currently this is not supported by the
|
||||
// ip packet router.
|
||||
pub reply_to_avg_mix_delays: Option<f64>,
|
||||
|
||||
// The maximum time in milliseconds the IPR should wait when filling up a mix packet
|
||||
// with ip packets.
|
||||
pub buffer_timeout: Option<u64>,
|
||||
}
|
||||
|
||||
// A disconnect request is when the client wants to disconnect from the ip packet router and free
|
||||
// up the allocated IP address.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||
pub struct DisconnectRequest {
|
||||
pub request_id: u64,
|
||||
@@ -155,9 +186,25 @@ pub struct DisconnectRequest {
|
||||
pub reply_to: Recipient,
|
||||
}
|
||||
|
||||
// A data request is when the client wants to send an IP packet to a destination.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||
pub struct DataRequest {
|
||||
pub ip_packet: bytes::Bytes,
|
||||
pub ip_packets: bytes::Bytes,
|
||||
}
|
||||
|
||||
// A ping request is when the client wants to check if the ip packet router is still alive.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||
pub struct PingRequest {
|
||||
pub request_id: u64,
|
||||
// The nym-address the response should be sent back to
|
||||
pub reply_to: Recipient,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||
pub struct HealthRequest {
|
||||
pub request_id: u64,
|
||||
// The nym-address the response should be sent back to
|
||||
pub reply_to: Recipient,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -175,10 +222,11 @@ mod tests {
|
||||
reply_to: Recipient::try_from_base58_string("D1rrpsysCGCYXy9saP8y3kmNpGtJZUXN9SvFoUcqAsM9.9Ssso1ea5NfkbMASdiseDSjTN1fSWda5SgEVjdSN4CvV@GJqd3ZxpXWSNxTfx7B1pPtswpetH4LnJdFeLeuY5KUuN").unwrap(),
|
||||
reply_to_hops: None,
|
||||
reply_to_avg_mix_delays: None,
|
||||
buffer_timeout: None,
|
||||
},
|
||||
)
|
||||
};
|
||||
assert_eq!(connect.to_bytes().unwrap().len(), 107);
|
||||
assert_eq!(connect.to_bytes().unwrap().len(), 108);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -186,7 +234,7 @@ mod tests {
|
||||
let data = IpPacketRequest {
|
||||
version: 4,
|
||||
data: IpPacketRequestData::Data(DataRequest {
|
||||
ip_packet: bytes::Bytes::from(vec![1u8; 32]),
|
||||
ip_packets: bytes::Bytes::from(vec![1u8; 32]),
|
||||
}),
|
||||
};
|
||||
assert_eq!(data.to_bytes().unwrap().len(), 35);
|
||||
@@ -197,7 +245,7 @@ mod tests {
|
||||
let data = IpPacketRequest {
|
||||
version: 4,
|
||||
data: IpPacketRequestData::Data(DataRequest {
|
||||
ip_packet: bytes::Bytes::from(vec![1, 2, 4, 2, 5]),
|
||||
ip_packets: bytes::Bytes::from(vec![1, 2, 4, 2, 5]),
|
||||
}),
|
||||
};
|
||||
|
||||
@@ -214,7 +262,7 @@ mod tests {
|
||||
assert_eq!(
|
||||
deserialized.data,
|
||||
IpPacketRequestData::Data(DataRequest {
|
||||
ip_packet: bytes::Bytes::from(vec![1, 2, 4, 2, 5]),
|
||||
ip_packets: bytes::Bytes::from(vec![1, 2, 4, 2, 5]),
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
@@ -64,6 +64,45 @@ impl IpPacketResponse {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_disconnect_success(request_id: u64, reply_to: Recipient) -> Self {
|
||||
Self {
|
||||
version: CURRENT_VERSION,
|
||||
data: IpPacketResponseData::Disconnect(DisconnectResponse {
|
||||
request_id,
|
||||
reply_to,
|
||||
reply: DisconnectResponseReply::Success,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_disconnect_failure(
|
||||
request_id: u64,
|
||||
reply_to: Recipient,
|
||||
reason: DisconnectFailureReason,
|
||||
) -> Self {
|
||||
Self {
|
||||
version: CURRENT_VERSION,
|
||||
data: IpPacketResponseData::Disconnect(DisconnectResponse {
|
||||
request_id,
|
||||
reply_to,
|
||||
reply: DisconnectResponseReply::Failure(reason),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_unrequested_disconnect(
|
||||
reply_to: Recipient,
|
||||
reason: UnrequestedDisconnectReason,
|
||||
) -> Self {
|
||||
Self {
|
||||
version: CURRENT_VERSION,
|
||||
data: IpPacketResponseData::UnrequestedDisconnect(UnrequestedDisconnect {
|
||||
reply_to,
|
||||
reason,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_ip_packet(ip_packet: bytes::Bytes) -> Self {
|
||||
Self {
|
||||
version: CURRENT_VERSION,
|
||||
@@ -106,7 +145,10 @@ impl IpPacketResponse {
|
||||
IpPacketResponseData::StaticConnect(response) => Some(response.request_id),
|
||||
IpPacketResponseData::DynamicConnect(response) => Some(response.request_id),
|
||||
IpPacketResponseData::Disconnect(response) => Some(response.request_id),
|
||||
IpPacketResponseData::UnrequestedDisconnect(_) => None,
|
||||
IpPacketResponseData::Data(_) => None,
|
||||
IpPacketResponseData::Pong(response) => Some(response.request_id),
|
||||
IpPacketResponseData::Health(response) => Some(response.request_id),
|
||||
IpPacketResponseData::Error(response) => Some(response.request_id),
|
||||
}
|
||||
}
|
||||
@@ -116,7 +158,10 @@ impl IpPacketResponse {
|
||||
IpPacketResponseData::StaticConnect(response) => Some(&response.reply_to),
|
||||
IpPacketResponseData::DynamicConnect(response) => Some(&response.reply_to),
|
||||
IpPacketResponseData::Disconnect(response) => Some(&response.reply_to),
|
||||
IpPacketResponseData::UnrequestedDisconnect(response) => Some(&response.reply_to),
|
||||
IpPacketResponseData::Data(_) => None,
|
||||
IpPacketResponseData::Pong(response) => Some(&response.reply_to),
|
||||
IpPacketResponseData::Health(response) => Some(&response.reply_to),
|
||||
IpPacketResponseData::Error(response) => Some(&response.reply_to),
|
||||
}
|
||||
}
|
||||
@@ -137,10 +182,28 @@ impl IpPacketResponse {
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum IpPacketResponseData {
|
||||
// Response for a static connect request
|
||||
StaticConnect(StaticConnectResponse),
|
||||
|
||||
// Response for a dynamic connect request
|
||||
DynamicConnect(DynamicConnectResponse),
|
||||
|
||||
// Response for a disconnect initiqated by the client
|
||||
Disconnect(DisconnectResponse),
|
||||
|
||||
// Message from the server that the client got disconnected without the client initiating it
|
||||
UnrequestedDisconnect(UnrequestedDisconnect),
|
||||
|
||||
// Response to a data request
|
||||
Data(DataResponse),
|
||||
|
||||
// Response to ping request
|
||||
Pong(PongResponse),
|
||||
|
||||
// Response for a health request
|
||||
Health(HealthResponse),
|
||||
|
||||
// Error response
|
||||
Error(ErrorResponse),
|
||||
}
|
||||
|
||||
@@ -234,11 +297,48 @@ pub enum DisconnectFailureReason {
|
||||
Other(String),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct UnrequestedDisconnect {
|
||||
pub reply_to: Recipient,
|
||||
pub reason: UnrequestedDisconnectReason,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)]
|
||||
pub enum UnrequestedDisconnectReason {
|
||||
#[error("client mixnet traffic timeout")]
|
||||
ClientMixnetTrafficTimeout,
|
||||
#[error("client tun traffic timeout")]
|
||||
ClientTunTrafficTimeout,
|
||||
#[error("{0}")]
|
||||
Other(String),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct DataResponse {
|
||||
pub ip_packet: bytes::Bytes,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct PongResponse {
|
||||
pub request_id: u64,
|
||||
pub reply_to: Recipient,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct HealthResponse {
|
||||
pub request_id: u64,
|
||||
pub reply_to: Recipient,
|
||||
pub reply: HealthResponseReply,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct HealthResponseReply {
|
||||
// Return the binary build information of the IPR
|
||||
pub build_info: nym_bin_common::build_information::BinaryBuildInformationOwned,
|
||||
// Return if the IPR has performed a successful routing test.
|
||||
pub routable: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct ErrorResponse {
|
||||
pub request_id: u64,
|
||||
|
||||
@@ -6,7 +6,7 @@ use log::*;
|
||||
use nym_coconut_interface::{Credential, VerificationKey};
|
||||
use nym_validator_client::coconut::all_coconut_api_clients;
|
||||
use nym_validator_client::nym_api::EpochId;
|
||||
use nym_validator_client::nyxd::contract_traits::MultisigQueryClient;
|
||||
use nym_validator_client::nyxd::contract_traits::{MultisigQueryClient, NymContractsProvider};
|
||||
use nym_validator_client::nyxd::AccountId;
|
||||
use nym_validator_client::{
|
||||
nyxd::{
|
||||
@@ -42,8 +42,33 @@ impl CoconutVerifier {
|
||||
let mut master_keys = HashMap::new();
|
||||
let mut api_clients = HashMap::new();
|
||||
|
||||
// don't make it a hard failure in case we're running on mainnet (where DKG hasn't been deployed yet)
|
||||
if nyxd_client.dkg_contract_address().is_none() {
|
||||
error!(
|
||||
"DKG contract address is not available - no coconut credentials will be redeemable"
|
||||
);
|
||||
return Ok(CoconutVerifier {
|
||||
address,
|
||||
nyxd_client: RwLock::new(nyxd_client),
|
||||
api_clients: Default::default(),
|
||||
master_keys: Default::default(),
|
||||
mix_denom_base,
|
||||
});
|
||||
}
|
||||
|
||||
let Ok(current_epoch) = nyxd_client.get_current_epoch().await else {
|
||||
// another case of somebody putting a placeholder address that doesn't exist
|
||||
error!("the specified DKG contract address is invalid - no coconut credentials will be redeemable");
|
||||
return Ok(CoconutVerifier {
|
||||
address,
|
||||
nyxd_client: RwLock::new(nyxd_client),
|
||||
api_clients: Default::default(),
|
||||
master_keys: Default::default(),
|
||||
mix_denom_base,
|
||||
});
|
||||
};
|
||||
|
||||
// might as well obtain the key for the current epoch, if applicable
|
||||
let current_epoch = nyxd_client.get_current_epoch().await?;
|
||||
if current_epoch.state.is_in_progress() {
|
||||
// note: even though we're constructing clients here, we will NOT be making any network requests
|
||||
let epoch_api_clients =
|
||||
|
||||
Generated
+15
@@ -958,6 +958,12 @@ version = "0.9.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f"
|
||||
|
||||
[[package]]
|
||||
name = "const-str"
|
||||
version = "0.5.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "aca749d3d3f5b87a0d6100509879f9cf486ab510803a4a4e1001da1ff61c2bd6"
|
||||
|
||||
[[package]]
|
||||
name = "constant_time_eq"
|
||||
version = "0.3.0"
|
||||
@@ -3722,6 +3728,7 @@ dependencies = [
|
||||
"clap",
|
||||
"clap_complete",
|
||||
"clap_complete_fig",
|
||||
"const-str",
|
||||
"log",
|
||||
"pretty_env_logger",
|
||||
"schemars",
|
||||
@@ -3762,6 +3769,7 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2 0.10.8",
|
||||
"si-scale",
|
||||
"sqlx",
|
||||
"tap",
|
||||
"thiserror",
|
||||
@@ -4138,6 +4146,7 @@ dependencies = [
|
||||
"cfg-if",
|
||||
"dotenvy",
|
||||
"hex-literal",
|
||||
"log",
|
||||
"once_cell",
|
||||
"schemars",
|
||||
"serde",
|
||||
@@ -6221,6 +6230,12 @@ dependencies = [
|
||||
"lazy_static",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "si-scale"
|
||||
version = "0.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "44beb68bf488343b13ddbd74d1d5d5e6559a58b6dfaee74eb8d5ed4f7ed7666f"
|
||||
|
||||
[[package]]
|
||||
name = "signal-hook"
|
||||
version = "0.3.17"
|
||||
|
||||
@@ -714,6 +714,7 @@ where
|
||||
client_state,
|
||||
reconstructed_receiver,
|
||||
started_client.task_handle,
|
||||
started_client.gateway_fd,
|
||||
None,
|
||||
))
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ use nym_task::{
|
||||
TaskHandle,
|
||||
};
|
||||
use nym_topology::NymTopology;
|
||||
use std::os::fd::RawFd;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
@@ -44,6 +45,8 @@ pub struct MixnetClient {
|
||||
pub(crate) task_handle: TaskHandle,
|
||||
pub(crate) packet_type: Option<PacketType>,
|
||||
|
||||
pub(crate) gateway_fd: Option<RawFd>,
|
||||
|
||||
// internal state used for the `Stream` implementation
|
||||
_buffered: Vec<ReconstructedMessage>,
|
||||
}
|
||||
@@ -56,6 +59,7 @@ impl MixnetClient {
|
||||
client_state: ClientState,
|
||||
reconstructed_receiver: ReconstructedMessagesReceiver,
|
||||
task_handle: TaskHandle,
|
||||
gateway_fd: Option<RawFd>,
|
||||
packet_type: Option<PacketType>,
|
||||
) -> Self {
|
||||
Self {
|
||||
@@ -65,6 +69,7 @@ impl MixnetClient {
|
||||
client_state,
|
||||
reconstructed_receiver,
|
||||
task_handle,
|
||||
gateway_fd,
|
||||
packet_type,
|
||||
_buffered: Vec::new(),
|
||||
}
|
||||
@@ -97,6 +102,10 @@ impl MixnetClient {
|
||||
&self.nym_address
|
||||
}
|
||||
|
||||
pub fn gateway_fd(&self) -> Option<RawFd> {
|
||||
self.gateway_fd
|
||||
}
|
||||
|
||||
/// Get a shallow clone of [`MixnetClientSender`]. Useful if you want split the send and
|
||||
/// receive logic in different locations.
|
||||
pub fn split_sender(&self) -> MixnetClientSender {
|
||||
|
||||
@@ -40,6 +40,7 @@ serde_json = { workspace = true }
|
||||
tap.workspace = true
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true, features = ["rt-multi-thread", "net", "io-util"] }
|
||||
tokio-util = { workspace = true, features = ["codec"] }
|
||||
url.workspace = true
|
||||
|
||||
[target.'cfg(target_os = "linux")'.dependencies]
|
||||
|
||||
@@ -0,0 +1,132 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use bytes::Bytes;
|
||||
use nym_ip_packet_requests::{codec::MultiIpPacketCodec, response::IpPacketResponse};
|
||||
use nym_sdk::mixnet::{MixnetMessageSender, Recipient};
|
||||
|
||||
use crate::{
|
||||
constants::CLIENT_HANDLER_ACTIVITY_TIMEOUT,
|
||||
error::{IpPacketRouterError, Result},
|
||||
util::create_message::create_input_message,
|
||||
};
|
||||
|
||||
// Data flow
|
||||
// Out: mixnet_listener -> decode -> handle_packet -> write_to_tun
|
||||
// In: tun_listener -> [connected_client_handler -> encode] -> mixnet_sender
|
||||
|
||||
// This handler is spawned as a task, and it listens to IP packets passed from the tun_listener,
|
||||
// encodes it, and then sends to mixnet.
|
||||
pub(crate) struct ConnectedClientHandler {
|
||||
nym_address: Recipient,
|
||||
mix_hops: Option<u8>,
|
||||
forward_from_tun_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
|
||||
mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender,
|
||||
close_rx: tokio::sync::oneshot::Receiver<()>,
|
||||
activity_timeout: tokio::time::Interval,
|
||||
encoder: MultiIpPacketCodec,
|
||||
}
|
||||
|
||||
impl ConnectedClientHandler {
|
||||
pub(crate) fn start(
|
||||
reply_to: Recipient,
|
||||
reply_to_hops: Option<u8>,
|
||||
buffer_timeout: std::time::Duration,
|
||||
mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender,
|
||||
) -> (
|
||||
tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
|
||||
tokio::sync::oneshot::Sender<()>,
|
||||
tokio::task::JoinHandle<()>,
|
||||
) {
|
||||
let (close_tx, close_rx) = tokio::sync::oneshot::channel();
|
||||
let (forward_from_tun_tx, forward_from_tun_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
|
||||
// Reset so that we don't get the first tick immediately
|
||||
let mut activity_timeout = tokio::time::interval(CLIENT_HANDLER_ACTIVITY_TIMEOUT);
|
||||
activity_timeout.reset();
|
||||
|
||||
let encoder = MultiIpPacketCodec::new(buffer_timeout);
|
||||
|
||||
let connected_client_handler = ConnectedClientHandler {
|
||||
nym_address: reply_to,
|
||||
mix_hops: reply_to_hops,
|
||||
forward_from_tun_rx,
|
||||
mixnet_client_sender,
|
||||
close_rx,
|
||||
activity_timeout,
|
||||
encoder,
|
||||
};
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
if let Err(err) = connected_client_handler.run().await {
|
||||
log::error!("connected client handler has failed: {err}")
|
||||
}
|
||||
});
|
||||
|
||||
(forward_from_tun_tx, close_tx, handle)
|
||||
}
|
||||
|
||||
async fn send_packets_to_mixnet(&mut self, packets: Bytes) -> Result<()> {
|
||||
let response_packet = IpPacketResponse::new_ip_packet(packets)
|
||||
.to_bytes()
|
||||
.map_err(|err| IpPacketRouterError::FailedToSerializeResponsePacket { source: err })?;
|
||||
let input_message = create_input_message(self.nym_address, response_packet, self.mix_hops);
|
||||
|
||||
self.mixnet_client_sender
|
||||
.send(input_message)
|
||||
.await
|
||||
.map_err(|err| IpPacketRouterError::FailedToSendPacketToMixnet { source: err })
|
||||
}
|
||||
|
||||
async fn handle_buffer_timeout(&mut self, packets: Bytes) -> Result<()> {
|
||||
if !packets.is_empty() {
|
||||
self.send_packets_to_mixnet(packets).await
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_packet(&mut self, packet: Vec<u8>) -> Result<()> {
|
||||
self.activity_timeout.reset();
|
||||
|
||||
if let Some(bundled_packets) = self.encoder.append_packet(packet.into()) {
|
||||
self.send_packets_to_mixnet(bundled_packets).await
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn run(mut self) -> Result<()> {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = &mut self.close_rx => {
|
||||
log::info!("client handler stopping: received close: {}", self.nym_address);
|
||||
break;
|
||||
},
|
||||
_ = self.activity_timeout.tick() => {
|
||||
log::info!("client handler stopping: activity timeout: {}", self.nym_address);
|
||||
break;
|
||||
},
|
||||
Some(packets) = self.encoder.buffer_timeout() => {
|
||||
if let Err(err) = self.handle_buffer_timeout(packets).await {
|
||||
log::error!("client handler: failed to handle buffer timeout: {err}");
|
||||
}
|
||||
},
|
||||
packet = self.forward_from_tun_rx.recv() => match packet {
|
||||
Some(packet) => {
|
||||
if let Err(err) = self.handle_packet(packet).await {
|
||||
log::error!("client handler: failed to handle packet: {err}");
|
||||
}
|
||||
},
|
||||
None => {
|
||||
log::info!("client handler stopping: tun channel closed");
|
||||
break;
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
log::debug!("ConnectedClientHandler: exiting");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -5,5 +5,12 @@ pub const TUN_BASE_NAME: &str = "nymtun";
|
||||
pub const TUN_DEVICE_ADDRESS: &str = "10.0.0.1";
|
||||
pub const TUN_DEVICE_NETMASK: &str = "255.255.255.0";
|
||||
|
||||
// We routinely check if any clients needs to be disconnected at this interval
|
||||
pub(crate) const DISCONNECT_TIMER_INTERVAL: Duration = Duration::from_secs(10);
|
||||
pub(crate) const CLIENT_INACTIVITY_TIMEOUT: Duration = Duration::from_secs(5 * 60);
|
||||
|
||||
// We consider a client inactive if it hasn't sent any mixnet packets in this duration
|
||||
pub(crate) const CLIENT_MIXNET_INACTIVITY_TIMEOUT: Duration = Duration::from_secs(5 * 60);
|
||||
|
||||
// We consider a client handler inactive if it hasn't received any packets from the tun device in
|
||||
// this duration
|
||||
pub(crate) const CLIENT_HANDLER_ACTIVITY_TIMEOUT: Duration = Duration::from_secs(10 * 60);
|
||||
|
||||
@@ -142,11 +142,9 @@ impl IpPacketRouter {
|
||||
// Channel used by the IpPacketRouter to signal connected and disconnected clients to the
|
||||
// TunListener
|
||||
let (connected_clients, connected_clients_rx) = mixnet_listener::ConnectedClients::new();
|
||||
// let (connected_client_tx, connected_client_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
|
||||
let tun_listener = tun_listener::TunListener {
|
||||
tun_reader,
|
||||
mixnet_client_sender: mixnet_client.split_sender(),
|
||||
task_client: task_handle.get_handle(),
|
||||
connected_clients: connected_clients_rx,
|
||||
};
|
||||
@@ -162,7 +160,6 @@ impl IpPacketRouter {
|
||||
mixnet_client,
|
||||
task_handle,
|
||||
connected_clients,
|
||||
// connected_client_tx,
|
||||
};
|
||||
|
||||
log::info!("The address of this client is: {self_address}");
|
||||
|
||||
@@ -5,6 +5,7 @@ pub use crate::config::Config;
|
||||
pub use ip_packet_router::{IpPacketRouter, OnStartData};
|
||||
|
||||
pub mod config;
|
||||
mod connected_client_handler;
|
||||
mod constants;
|
||||
pub mod error;
|
||||
mod ip_packet_router;
|
||||
|
||||
@@ -3,6 +3,8 @@ mod cli;
|
||||
#[cfg(target_os = "linux")]
|
||||
mod config;
|
||||
#[cfg(target_os = "linux")]
|
||||
mod connected_client_handler;
|
||||
#[cfg(target_os = "linux")]
|
||||
mod constants;
|
||||
#[cfg(target_os = "linux")]
|
||||
mod error;
|
||||
|
||||
@@ -3,8 +3,10 @@ use std::{
|
||||
net::{IpAddr, SocketAddr},
|
||||
};
|
||||
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use futures::StreamExt;
|
||||
use nym_ip_packet_requests::{
|
||||
codec::MultiIpPacketCodec,
|
||||
request::{IpPacketRequest, IpPacketRequestData},
|
||||
response::{
|
||||
DynamicConnectFailureReason, ErrorResponseReply, IpPacketResponse,
|
||||
@@ -14,14 +16,18 @@ use nym_ip_packet_requests::{
|
||||
use nym_sdk::mixnet::{MixnetMessageSender, Recipient};
|
||||
use nym_sphinx::receiver::ReconstructedMessage;
|
||||
use nym_task::TaskHandle;
|
||||
use tap::TapFallible;
|
||||
#[cfg(target_os = "linux")]
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_util::codec::Decoder;
|
||||
|
||||
use crate::{
|
||||
config::Config,
|
||||
constants::{CLIENT_INACTIVITY_TIMEOUT, DISCONNECT_TIMER_INTERVAL},
|
||||
connected_client_handler,
|
||||
constants::{CLIENT_MIXNET_INACTIVITY_TIMEOUT, DISCONNECT_TIMER_INTERVAL},
|
||||
error::{IpPacketRouterError, Result},
|
||||
request_filter::{self},
|
||||
tun_listener,
|
||||
util::generate_new_ip,
|
||||
util::{
|
||||
create_message::create_input_message,
|
||||
@@ -29,69 +35,23 @@ use crate::{
|
||||
},
|
||||
};
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
pub(crate) struct MixnetListener {
|
||||
pub(crate) _config: Config,
|
||||
pub(crate) request_filter: request_filter::RequestFilter,
|
||||
pub(crate) tun_writer: tokio::io::WriteHalf<tokio_tun::Tun>,
|
||||
pub(crate) mixnet_client: nym_sdk::mixnet::MixnetClient,
|
||||
pub(crate) task_handle: TaskHandle,
|
||||
pub(crate) connected_clients: ConnectedClients,
|
||||
}
|
||||
|
||||
pub(crate) struct ConnectedClients {
|
||||
// The set of connected clients
|
||||
clients: HashMap<IpAddr, ConnectedClient>,
|
||||
connected_client_tx: tokio::sync::mpsc::UnboundedSender<ConnectedClientEvent>,
|
||||
}
|
||||
|
||||
pub(crate) struct ConnectedClientsListener {
|
||||
clients: HashMap<IpAddr, ConnectedClient>,
|
||||
pub(crate) connected_client_rx: tokio::sync::mpsc::UnboundedReceiver<ConnectedClientEvent>,
|
||||
}
|
||||
|
||||
impl ConnectedClientsListener {
|
||||
pub(crate) fn get(&self, ip: &IpAddr) -> Option<&ConnectedClient> {
|
||||
self.clients.get(ip)
|
||||
}
|
||||
|
||||
pub(crate) fn update(&mut self, event: ConnectedClientEvent) {
|
||||
match event {
|
||||
ConnectedClientEvent::Connect(connected_event) => {
|
||||
let ConnectEvent {
|
||||
ip,
|
||||
nym_address,
|
||||
mix_hops,
|
||||
} = *connected_event;
|
||||
log::trace!("Connect client: {ip}");
|
||||
self.clients.insert(
|
||||
ip,
|
||||
ConnectedClient {
|
||||
nym_address,
|
||||
mix_hops,
|
||||
last_activity: std::time::Instant::now(),
|
||||
},
|
||||
);
|
||||
}
|
||||
ConnectedClientEvent::Disconnect(DisconnectEvent(ip)) => {
|
||||
log::trace!("Disconnect client: {ip}");
|
||||
self.clients.remove(&ip);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Notify the tun listener when a new client connects or disconnects
|
||||
tun_listener_connected_client_tx: tokio::sync::mpsc::UnboundedSender<ConnectedClientEvent>,
|
||||
}
|
||||
|
||||
impl ConnectedClients {
|
||||
pub(crate) fn new() -> (Self, ConnectedClientsListener) {
|
||||
pub(crate) fn new() -> (Self, tun_listener::ConnectedClientsListener) {
|
||||
let (connected_client_tx, connected_client_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
(
|
||||
Self {
|
||||
clients: Default::default(),
|
||||
connected_client_tx,
|
||||
},
|
||||
ConnectedClientsListener {
|
||||
clients: Default::default(),
|
||||
connected_client_rx,
|
||||
tun_listener_connected_client_tx: connected_client_tx,
|
||||
},
|
||||
tun_listener::ConnectedClientsListener::new(connected_client_rx),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -125,22 +85,38 @@ impl ConnectedClients {
|
||||
.find(|client| client.nym_address == *nym_address)
|
||||
}
|
||||
|
||||
fn connect(&mut self, ip: IpAddr, nym_address: Recipient, mix_hops: Option<u8>) {
|
||||
fn connect(
|
||||
&mut self,
|
||||
ip: IpAddr,
|
||||
nym_address: Recipient,
|
||||
mix_hops: Option<u8>,
|
||||
forward_from_tun_tx: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
|
||||
close_tx: tokio::sync::oneshot::Sender<()>,
|
||||
handle: tokio::task::JoinHandle<()>,
|
||||
) {
|
||||
// The map of connected clients that the mixnet listener keeps track of. It monitors
|
||||
// activity and disconnects clients that have been inactive for too long.
|
||||
self.clients.insert(
|
||||
ip,
|
||||
ConnectedClient {
|
||||
nym_address,
|
||||
mix_hops,
|
||||
last_activity: std::time::Instant::now(),
|
||||
close_tx: Some(close_tx),
|
||||
handle,
|
||||
},
|
||||
);
|
||||
self.connected_client_tx
|
||||
// Send the connected client info to the tun listener, which will use it to forward packets
|
||||
// to the connected client handler.
|
||||
self.tun_listener_connected_client_tx
|
||||
.send(ConnectedClientEvent::Connect(Box::new(ConnectEvent {
|
||||
ip,
|
||||
nym_address,
|
||||
mix_hops,
|
||||
forward_from_tun_tx,
|
||||
})))
|
||||
.unwrap();
|
||||
.tap_err(|err| {
|
||||
log::error!("Failed to send connected client event: {err}");
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
|
||||
fn update_activity(&mut self, ip: &IpAddr) -> Result<()> {
|
||||
@@ -152,25 +128,57 @@ impl ConnectedClients {
|
||||
}
|
||||
}
|
||||
|
||||
fn disconnect_inactive_clients(&mut self) {
|
||||
let now = std::time::Instant::now();
|
||||
let inactive_clients: Vec<IpAddr> = self
|
||||
.clients
|
||||
.iter()
|
||||
// Identify connected client handlers that have stopped without being told to stop
|
||||
fn get_finished_client_handlers(&mut self) -> Vec<(IpAddr, Recipient)> {
|
||||
self.clients
|
||||
.iter_mut()
|
||||
.filter_map(|(ip, client)| {
|
||||
if now.duration_since(client.last_activity) > CLIENT_INACTIVITY_TIMEOUT {
|
||||
Some(*ip)
|
||||
if client.handle.is_finished() {
|
||||
Some((*ip, client.nym_address))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
for ip in inactive_clients {
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn get_inactive_clients(&mut self) -> Vec<(IpAddr, Recipient)> {
|
||||
let now = std::time::Instant::now();
|
||||
self.clients
|
||||
.iter()
|
||||
.filter_map(|(ip, client)| {
|
||||
if now.duration_since(client.last_activity) > CLIENT_MIXNET_INACTIVITY_TIMEOUT {
|
||||
Some((*ip, client.nym_address))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn disconnect_stopped_client_handlers(&mut self, stopped_clients: Vec<(IpAddr, Recipient)>) {
|
||||
for (ip, _) in &stopped_clients {
|
||||
log::info!("Disconnect stopped client: {ip}");
|
||||
self.clients.remove(ip);
|
||||
self.tun_listener_connected_client_tx
|
||||
.send(ConnectedClientEvent::Disconnect(DisconnectEvent(*ip)))
|
||||
.tap_err(|err| {
|
||||
log::error!("Failed to send disconnect event: {err}");
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
|
||||
fn disconnect_inactive_clients(&mut self, inactive_clients: Vec<(IpAddr, Recipient)>) {
|
||||
for (ip, _) in &inactive_clients {
|
||||
log::info!("Disconnect inactive client: {ip}");
|
||||
self.clients.remove(&ip);
|
||||
self.connected_client_tx
|
||||
.send(ConnectedClientEvent::Disconnect(DisconnectEvent(ip)))
|
||||
.unwrap();
|
||||
self.clients.remove(ip);
|
||||
self.tun_listener_connected_client_tx
|
||||
.send(ConnectedClientEvent::Disconnect(DisconnectEvent(*ip)))
|
||||
.tap_err(|err| {
|
||||
log::error!("Failed to send disconnect event: {err}");
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -180,9 +188,22 @@ impl ConnectedClients {
|
||||
}
|
||||
|
||||
pub(crate) struct ConnectedClient {
|
||||
// The nym address of the connected client that we are communicating with on the other side of
|
||||
// the mixnet
|
||||
pub(crate) nym_address: Recipient,
|
||||
|
||||
// Number of mix node hops that the client has requested to use
|
||||
pub(crate) mix_hops: Option<u8>,
|
||||
|
||||
// Keep track of last activity so we can disconnect inactive clients
|
||||
pub(crate) last_activity: std::time::Instant,
|
||||
|
||||
// Send to connected clients listener to stop. This is option only because we need to take
|
||||
// ownership of it when the client is dropped.
|
||||
pub(crate) close_tx: Option<tokio::sync::oneshot::Sender<()>>,
|
||||
|
||||
// Handle for the connected client handler
|
||||
pub(crate) handle: tokio::task::JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl ConnectedClient {
|
||||
@@ -191,12 +212,47 @@ impl ConnectedClient {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ConnectedClient {
|
||||
fn drop(&mut self) {
|
||||
log::debug!("signal to close client: {}", self.nym_address);
|
||||
if let Some(close_tx) = self.close_tx.take() {
|
||||
close_tx.send(()).ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type PacketHandleResult = Result<Option<IpPacketResponse>>;
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
pub(crate) struct MixnetListener {
|
||||
// The configuration for the mixnet listener
|
||||
pub(crate) _config: Config,
|
||||
|
||||
// The request filter that we use to check if a packet should be forwarded
|
||||
pub(crate) request_filter: request_filter::RequestFilter,
|
||||
|
||||
// The TUN device that we use to send and receive packets from the internet
|
||||
pub(crate) tun_writer: tokio::io::WriteHalf<tokio_tun::Tun>,
|
||||
|
||||
// The mixnet client that we use to send and receive packets from the mixnet
|
||||
pub(crate) mixnet_client: nym_sdk::mixnet::MixnetClient,
|
||||
|
||||
// The task handle for the main loop
|
||||
pub(crate) task_handle: TaskHandle,
|
||||
|
||||
// The map of connected clients that the mixnet listener keeps track of. It monitors
|
||||
// activity and disconnects clients that have been inactive for too long.
|
||||
pub(crate) connected_clients: ConnectedClients,
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
impl MixnetListener {
|
||||
// Receving a static connect request from a client with an IP provided that we assign to them,
|
||||
// if it's available. If it's not available, we send a failure response.
|
||||
async fn on_static_connect_request(
|
||||
&mut self,
|
||||
connect_request: nym_ip_packet_requests::request::StaticConnectRequest,
|
||||
) -> Result<Option<IpPacketResponse>> {
|
||||
) -> PacketHandleResult {
|
||||
log::info!(
|
||||
"Received static connect request from {sender_address}",
|
||||
sender_address = connect_request.reply_to
|
||||
@@ -206,6 +262,8 @@ impl MixnetListener {
|
||||
let requested_ip = connect_request.ip;
|
||||
let reply_to = connect_request.reply_to;
|
||||
let reply_to_hops = connect_request.reply_to_hops;
|
||||
// TODO: add to connect request
|
||||
let buffer_timeout = nym_ip_packet_requests::codec::BUFFER_TIMEOUT;
|
||||
// TODO: ignoring reply_to_avg_mix_delays for now
|
||||
|
||||
// Check that the IP is available in the set of connected clients
|
||||
@@ -230,8 +288,25 @@ impl MixnetListener {
|
||||
}
|
||||
(false, false) => {
|
||||
log::info!("Connecting a new client");
|
||||
self.connected_clients
|
||||
.connect(requested_ip, reply_to, reply_to_hops);
|
||||
|
||||
// Spawn the ConnectedClientHandler for the new client
|
||||
let (forward_from_tun_tx, close_tx, handle) =
|
||||
connected_client_handler::ConnectedClientHandler::start(
|
||||
reply_to,
|
||||
reply_to_hops,
|
||||
buffer_timeout,
|
||||
self.mixnet_client.split_sender(),
|
||||
);
|
||||
|
||||
// Register the new client in the set of connected clients
|
||||
self.connected_clients.connect(
|
||||
requested_ip,
|
||||
reply_to,
|
||||
reply_to_hops,
|
||||
forward_from_tun_tx,
|
||||
close_tx,
|
||||
handle,
|
||||
);
|
||||
Ok(Some(IpPacketResponse::new_static_connect_success(
|
||||
request_id, reply_to,
|
||||
)))
|
||||
@@ -258,7 +333,7 @@ impl MixnetListener {
|
||||
async fn on_dynamic_connect_request(
|
||||
&mut self,
|
||||
connect_request: nym_ip_packet_requests::request::DynamicConnectRequest,
|
||||
) -> Result<Option<IpPacketResponse>> {
|
||||
) -> PacketHandleResult {
|
||||
log::info!(
|
||||
"Received dynamic connect request from {sender_address}",
|
||||
sender_address = connect_request.reply_to
|
||||
@@ -267,6 +342,8 @@ impl MixnetListener {
|
||||
let request_id = connect_request.request_id;
|
||||
let reply_to = connect_request.reply_to;
|
||||
let reply_to_hops = connect_request.reply_to_hops;
|
||||
// TODO: add to connect request
|
||||
let buffer_timeout = nym_ip_packet_requests::codec::BUFFER_TIMEOUT;
|
||||
// TODO: ignoring reply_to_avg_mix_delays for now
|
||||
|
||||
// Check if it's the same client connecting again, then we just reuse the same IP
|
||||
@@ -298,8 +375,24 @@ impl MixnetListener {
|
||||
)));
|
||||
};
|
||||
|
||||
self.connected_clients
|
||||
.connect(new_ip, reply_to, reply_to_hops);
|
||||
// Spawn the ConnectedClientHandler for the new client
|
||||
let (forward_from_tun_tx, close_tx, handle) =
|
||||
connected_client_handler::ConnectedClientHandler::start(
|
||||
reply_to,
|
||||
reply_to_hops,
|
||||
buffer_timeout,
|
||||
self.mixnet_client.split_sender(),
|
||||
);
|
||||
|
||||
// Register the new client in the set of connected clients
|
||||
self.connected_clients.connect(
|
||||
new_ip,
|
||||
reply_to,
|
||||
reply_to_hops,
|
||||
forward_from_tun_tx,
|
||||
close_tx,
|
||||
handle,
|
||||
);
|
||||
Ok(Some(IpPacketResponse::new_dynamic_connect_success(
|
||||
request_id, reply_to, new_ip,
|
||||
)))
|
||||
@@ -308,15 +401,12 @@ impl MixnetListener {
|
||||
fn on_disconnect_request(
|
||||
&self,
|
||||
_disconnect_request: nym_ip_packet_requests::request::DisconnectRequest,
|
||||
) -> Result<Option<IpPacketResponse>> {
|
||||
) -> PacketHandleResult {
|
||||
log::info!("Received disconnect request: not implemented, dropping");
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
async fn on_data_request(
|
||||
&mut self,
|
||||
data_request: nym_ip_packet_requests::request::DataRequest,
|
||||
) -> Result<Option<IpPacketResponse>> {
|
||||
async fn handle_packet(&mut self, ip_packet: &Bytes) -> PacketHandleResult {
|
||||
log::trace!("Received data request");
|
||||
|
||||
// We don't forward packets that we are not able to parse. BUT, there might be a good
|
||||
@@ -331,10 +421,10 @@ impl MixnetListener {
|
||||
src_addr,
|
||||
dst_addr,
|
||||
dst,
|
||||
} = parse_packet(&data_request.ip_packet)?;
|
||||
} = parse_packet(ip_packet)?;
|
||||
|
||||
let dst_str = dst.map_or(dst_addr.to_string(), |dst| dst.to_string());
|
||||
log::info!("Received packet: {packet_type}: {src_addr} -> {dst_str}");
|
||||
log::debug!("Received packet: {packet_type}: {src_addr} -> {dst_str}");
|
||||
|
||||
if let Some(connected_client) = self.connected_clients.get_client_from_ip_mut(&src_addr) {
|
||||
// Keep track of activity so we can disconnect inactive clients
|
||||
@@ -347,7 +437,7 @@ impl MixnetListener {
|
||||
if self.request_filter.check_address(&dst).await {
|
||||
// Forward the packet to the TUN device where it will be routed out to the internet
|
||||
self.tun_writer
|
||||
.write_all(&data_request.ip_packet)
|
||||
.write_all(ip_packet)
|
||||
.await
|
||||
.map_err(|_| IpPacketRouterError::FailedToWritePacketToTun)?;
|
||||
Ok(None)
|
||||
@@ -362,16 +452,31 @@ impl MixnetListener {
|
||||
}
|
||||
} else {
|
||||
// If the client is not connected, just drop the packet silently
|
||||
log::info!("Dropping packet: no connected client for {src_addr}");
|
||||
log::info!("dropping packet from mixnet: no registered client for packet with source: {src_addr}");
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
async fn on_data_request(
|
||||
&mut self,
|
||||
data_request: nym_ip_packet_requests::request::DataRequest,
|
||||
) -> Result<Vec<PacketHandleResult>> {
|
||||
let mut responses = Vec::new();
|
||||
let mut decoder = MultiIpPacketCodec::new(nym_ip_packet_requests::codec::BUFFER_TIMEOUT);
|
||||
let mut bytes = BytesMut::new();
|
||||
bytes.extend_from_slice(&data_request.ip_packets);
|
||||
while let Ok(Some(packet)) = decoder.decode(&mut bytes) {
|
||||
let result = self.handle_packet(&packet).await;
|
||||
responses.push(result);
|
||||
}
|
||||
Ok(responses)
|
||||
}
|
||||
|
||||
fn on_version_mismatch(
|
||||
&self,
|
||||
version: u8,
|
||||
reconstructed: &ReconstructedMessage,
|
||||
) -> Result<Option<IpPacketResponse>> {
|
||||
) -> PacketHandleResult {
|
||||
// If it's possible to parse, do so and return back a response, otherwise just drop
|
||||
let (id, recipient) = IpPacketRequest::from_reconstructed_message(reconstructed)
|
||||
.ok()
|
||||
@@ -393,7 +498,7 @@ impl MixnetListener {
|
||||
async fn on_reconstructed_message(
|
||||
&mut self,
|
||||
reconstructed: ReconstructedMessage,
|
||||
) -> Result<Option<IpPacketResponse>> {
|
||||
) -> Result<Vec<PacketHandleResult>> {
|
||||
log::debug!(
|
||||
"Received message with sender_tag: {:?}",
|
||||
reconstructed.sender_tag
|
||||
@@ -405,7 +510,7 @@ impl MixnetListener {
|
||||
// backwards compatible.
|
||||
if *version != nym_ip_packet_requests::CURRENT_VERSION {
|
||||
log::info!("Received packet with invalid version: v{version}");
|
||||
return self.on_version_mismatch(*version, &reconstructed);
|
||||
return Ok(vec![self.on_version_mismatch(*version, &reconstructed)]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -414,23 +519,49 @@ impl MixnetListener {
|
||||
|
||||
match request.data {
|
||||
IpPacketRequestData::StaticConnect(connect_request) => {
|
||||
self.on_static_connect_request(connect_request).await
|
||||
Ok(vec![self.on_static_connect_request(connect_request).await])
|
||||
}
|
||||
IpPacketRequestData::DynamicConnect(connect_request) => {
|
||||
self.on_dynamic_connect_request(connect_request).await
|
||||
Ok(vec![self.on_dynamic_connect_request(connect_request).await])
|
||||
}
|
||||
IpPacketRequestData::Disconnect(disconnect_request) => {
|
||||
self.on_disconnect_request(disconnect_request)
|
||||
Ok(vec![self.on_disconnect_request(disconnect_request)])
|
||||
}
|
||||
IpPacketRequestData::Data(data_request) => self.on_data_request(data_request).await,
|
||||
IpPacketRequestData::Ping(_) => {
|
||||
log::info!("Received ping request: not implemented, dropping");
|
||||
Ok(vec![])
|
||||
}
|
||||
IpPacketRequestData::Health(_) => {
|
||||
log::info!("Received health request: not implemented, dropping");
|
||||
Ok(vec![])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_disconnect_timer(&mut self) {
|
||||
let stopped_clients = self.connected_clients.get_finished_client_handlers();
|
||||
let inactive_clients = self.connected_clients.get_inactive_clients();
|
||||
|
||||
// TODO: Send disconnect responses to all disconnected clients
|
||||
//for (ip, nym_address) in stopped_clients.iter().chain(disconnected_clients.iter()) {
|
||||
// let response = IpPacketResponse::new_unrequested_disconnect(...)
|
||||
// if let Err(err) = self.handle_response(response).await {
|
||||
// log::error!("Failed to send disconnect response: {err}");
|
||||
// }
|
||||
//}
|
||||
|
||||
self.connected_clients
|
||||
.disconnect_stopped_client_handlers(stopped_clients);
|
||||
self.connected_clients
|
||||
.disconnect_inactive_clients(inactive_clients);
|
||||
}
|
||||
|
||||
// When an incoming mixnet message triggers a response that we send back, such as during
|
||||
// connect handshake.
|
||||
async fn handle_response(&self, response: IpPacketResponse) -> Result<()> {
|
||||
let Some(recipient) = response.recipient() else {
|
||||
log::error!("no recipient in response packet, this should NOT happen!");
|
||||
log::error!("No recipient in response packet, this should NOT happen!");
|
||||
return Err(IpPacketRouterError::NoRecipientInResponse);
|
||||
};
|
||||
|
||||
@@ -452,6 +583,26 @@ impl MixnetListener {
|
||||
.map_err(|err| IpPacketRouterError::FailedToSendPacketToMixnet { source: err })
|
||||
}
|
||||
|
||||
// A single incoming request can trigger multiple responses, such as when data requests contain
|
||||
// multiple IP packets.
|
||||
async fn handle_responses(&self, responses: Vec<PacketHandleResult>) {
|
||||
for response in responses {
|
||||
match response {
|
||||
Ok(Some(response)) => {
|
||||
if let Err(err) = self.handle_response(response).await {
|
||||
log::error!("Mixnet listener failed to handle response: {err}");
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
continue;
|
||||
}
|
||||
Err(err) => {
|
||||
log::error!("Error handling mixnet message: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn run(mut self) -> Result<()> {
|
||||
let mut task_client = self.task_handle.fork("main_loop");
|
||||
let mut disconnect_timer = tokio::time::interval(DISCONNECT_TIMER_INTERVAL);
|
||||
@@ -462,21 +613,14 @@ impl MixnetListener {
|
||||
log::debug!("IpPacketRouter [main loop]: received shutdown");
|
||||
},
|
||||
_ = disconnect_timer.tick() => {
|
||||
self.connected_clients.disconnect_inactive_clients();
|
||||
self.handle_disconnect_timer();
|
||||
},
|
||||
msg = self.mixnet_client.next() => {
|
||||
if let Some(msg) = msg {
|
||||
match self.on_reconstructed_message(msg).await {
|
||||
Ok(Some(response)) => {
|
||||
if let Err(err) = self.handle_response(response).await {
|
||||
log::error!("Mixnet listener failed to handle response: {err}");
|
||||
}
|
||||
},
|
||||
Ok(None) => {
|
||||
continue;
|
||||
},
|
||||
Ok(responses) => self.handle_responses(responses).await,
|
||||
Err(err) => {
|
||||
log::error!("Error handling mixnet message: {err}");
|
||||
log::error!("Error handling reconstructed mixnet message: {err}");
|
||||
}
|
||||
|
||||
};
|
||||
@@ -502,6 +646,5 @@ pub(crate) struct DisconnectEvent(pub(crate) IpAddr);
|
||||
|
||||
pub(crate) struct ConnectEvent {
|
||||
pub(crate) ip: IpAddr,
|
||||
pub(crate) nym_address: Recipient,
|
||||
pub(crate) mix_hops: Option<u8>,
|
||||
pub(crate) forward_from_tun_tx: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
|
||||
}
|
||||
|
||||
@@ -1,22 +1,75 @@
|
||||
use nym_ip_packet_requests::response::IpPacketResponse;
|
||||
use nym_sdk::mixnet::MixnetMessageSender;
|
||||
use std::{collections::HashMap, net::IpAddr};
|
||||
|
||||
use nym_task::TaskClient;
|
||||
#[cfg(target_os = "linux")]
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
use crate::{
|
||||
error::{IpPacketRouterError, Result},
|
||||
error::Result,
|
||||
mixnet_listener::{self},
|
||||
util::{create_message::create_input_message, parse_ip::parse_dst_addr},
|
||||
util::parse_ip::parse_dst_addr,
|
||||
};
|
||||
|
||||
// The TUN listener keeps a local map of the connected clients that has its state updated by the
|
||||
// mixnet listener. Basically it's just so that we don't have to have mutexes around shared state.
|
||||
// It's even ok if this is slightly out of date
|
||||
pub(crate) struct ConnectedClientMirror {
|
||||
pub(crate) forward_from_tun_tx: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
|
||||
}
|
||||
|
||||
pub(crate) struct ConnectedClientsListener {
|
||||
clients: HashMap<IpAddr, ConnectedClientMirror>,
|
||||
connected_client_rx:
|
||||
tokio::sync::mpsc::UnboundedReceiver<mixnet_listener::ConnectedClientEvent>,
|
||||
}
|
||||
|
||||
impl ConnectedClientsListener {
|
||||
pub(crate) fn new(
|
||||
connected_client_rx: tokio::sync::mpsc::UnboundedReceiver<
|
||||
mixnet_listener::ConnectedClientEvent,
|
||||
>,
|
||||
) -> Self {
|
||||
ConnectedClientsListener {
|
||||
clients: HashMap::new(),
|
||||
connected_client_rx,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get(&self, ip: &IpAddr) -> Option<&ConnectedClientMirror> {
|
||||
self.clients.get(ip)
|
||||
}
|
||||
|
||||
pub(crate) fn update(&mut self, event: mixnet_listener::ConnectedClientEvent) {
|
||||
match event {
|
||||
mixnet_listener::ConnectedClientEvent::Connect(connected_event) => {
|
||||
let mixnet_listener::ConnectEvent {
|
||||
ip,
|
||||
forward_from_tun_tx,
|
||||
} = *connected_event;
|
||||
log::trace!("Connect client: {ip}");
|
||||
self.clients.insert(
|
||||
ip,
|
||||
ConnectedClientMirror {
|
||||
forward_from_tun_tx,
|
||||
},
|
||||
);
|
||||
}
|
||||
mixnet_listener::ConnectedClientEvent::Disconnect(
|
||||
mixnet_listener::DisconnectEvent(ip),
|
||||
) => {
|
||||
log::trace!("Disconnect client: {ip}");
|
||||
self.clients.remove(&ip);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Reads packet from TUN and writes to mixnet client
|
||||
#[cfg(target_os = "linux")]
|
||||
pub(crate) struct TunListener {
|
||||
pub(crate) tun_reader: tokio::io::ReadHalf<tokio_tun::Tun>,
|
||||
pub(crate) mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender,
|
||||
pub(crate) task_client: TaskClient,
|
||||
pub(crate) connected_clients: mixnet_listener::ConnectedClientsListener,
|
||||
pub(crate) connected_clients: ConnectedClientsListener,
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
@@ -27,26 +80,22 @@ impl TunListener {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
if let Some(mixnet_listener::ConnectedClient {
|
||||
nym_address,
|
||||
mix_hops,
|
||||
..
|
||||
if let Some(ConnectedClientMirror {
|
||||
forward_from_tun_tx,
|
||||
}) = self.connected_clients.get(&dst_addr)
|
||||
{
|
||||
let packet = buf[..len].to_vec();
|
||||
let response_packet = IpPacketResponse::new_ip_packet(packet.into())
|
||||
.to_bytes()
|
||||
.map_err(|err| IpPacketRouterError::FailedToSerializeResponsePacket {
|
||||
source: err,
|
||||
})?;
|
||||
let input_message = create_input_message(*nym_address, response_packet, *mix_hops);
|
||||
|
||||
self.mixnet_client_sender
|
||||
.send(input_message)
|
||||
.await
|
||||
.map_err(|err| IpPacketRouterError::FailedToSendPacketToMixnet { source: err })?;
|
||||
if forward_from_tun_tx.send(packet).is_err() {
|
||||
log::warn!("Failed to forward packet to connected client {dst_addr}: disconnecting it from tun listener");
|
||||
self.connected_clients
|
||||
.update(mixnet_listener::ConnectedClientEvent::Disconnect(
|
||||
mixnet_listener::DisconnectEvent(dst_addr),
|
||||
));
|
||||
}
|
||||
} else {
|
||||
log::info!("No registered nym-address for packet - dropping");
|
||||
log::info!(
|
||||
"dropping packet from network: no registered client for destination: {dst_addr}"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user