Compare commits

...

32 Commits

Author SHA1 Message Date
Bogdan-Ștefan Neacşu 4cc63bac1c Fix Box ws_fd 2024-02-13 12:06:10 +02:00
Bogdan-Ștefan Neacşu 6519bfa533 Add log 2024-02-13 11:45:25 +02:00
Bogdan-Ștefan Neacşu dc9823334a Testing 2024-02-12 20:10:09 +02:00
Jon Häggblad cec05a99f4 Tweak packet rate log string 2024-02-12 13:05:30 +01:00
Jon Häggblad d487f4d98c Merge pull request #4389 from nymtech/jon/handle-multiple-ip-packet-in-ipr
Handle multiple IP packets in ip-packet-router
2024-02-12 12:39:40 +01:00
Jon Häggblad b9e9809938 Extract out handle_responses 2024-02-12 12:14:51 +01:00
Jędrzej Stuczyński 9b50188d7d Merge pull request #4391 from nymtech/chore/reexport-types
re-export cosmrs' cosmwasm types
2024-02-12 09:14:33 +00:00
Jon Häggblad 0e3dbece8b Fix unit test 2024-02-12 08:21:32 +01:00
Jędrzej Stuczyński 052f7649a8 re-export cosmrs' cosmwasm types 2024-02-11 18:57:32 +00:00
Jon Häggblad 3fde9e648f Add health request response 2024-02-10 23:53:03 +01:00
Jon Häggblad 0b37b9fb1c Add ping pong request response 2024-02-10 23:35:36 +01:00
Jon Häggblad e273bfc25e Add message for unrequested disconnect on the IPR 2024-02-10 23:27:07 +01:00
Jon Häggblad d2ef94f1bd Add buffer timeout to connect request 2024-02-10 23:13:50 +01:00
Jon Häggblad 92ab794294 Encode packets in connection handler 2024-02-10 23:07:45 +01:00
Jon Häggblad 3f0210d56a Handle incoming multi-ip packets in IPR 2024-02-10 22:40:21 +01:00
Jon Häggblad 9b53473bee Tweak retransmission log info (#4387) 2024-02-09 18:22:25 +01:00
Tommy Verrall 5fdae14cb9 Merge pull request #4385 from nymtech/bugfix/gateway-vk-caching-without-coconut
[bugfix] remove hard failure on dkg contract queries in case it doesn't exist
2024-02-09 18:11:05 +01:00
Jędrzej Stuczyński 2f4fad3ce3 [bugfix] remove hard failure on dkg contract queries in case it doesn't exist 2024-02-09 11:39:27 +00:00
Jon Häggblad cc604c5f18 Merge pull request #4380 from nymtech/jon/ipr-connected-client-handler
Connected client handler in the IPR
2024-02-09 11:37:13 +01:00
Jon Häggblad d0aece501f Add missing deploy step to ci-build-upload-binaries 2024-02-09 11:28:32 +01:00
Jon Häggblad 22b5670396 Update release/publish workflow names to match filenames (#4383) 2024-02-09 11:26:39 +01:00
Jon Häggblad 79e9399dfe Add nightly schedule trigger for ci-build-upload-binaries 2024-02-09 11:17:34 +01:00
Jon Häggblad 8450df28df Tweak logging 2024-02-09 10:58:49 +01:00
Jon Häggblad 0b23d1624f Switch to JoinHandle 2024-02-09 09:49:18 +01:00
Jon Häggblad 2026ffd61f Error logging 2024-02-09 09:49:18 +01:00
Jon Häggblad 48e5aecda1 Don't unwrap on failed to send close signal 2024-02-09 09:49:18 +01:00
Jon Häggblad d8e484b77e Disconnect stopped client handlers 2024-02-09 09:49:18 +01:00
Jon Häggblad d4ca2a7220 Implement drop for client handlers too 2024-02-09 09:49:18 +01:00
Jon Häggblad 2f0074821c Downgrade some logging after checking it works 2024-02-09 09:49:18 +01:00
Jon Häggblad d5e332ad39 Deduplicate and clean up 2024-02-09 09:49:18 +01:00
Jon Häggblad 14bf5645b1 Add missing module 2024-02-09 09:49:18 +01:00
Jon Häggblad a11582749c Add connected_client_handler 2024-02-09 09:49:18 +01:00
34 changed files with 749 additions and 161 deletions
@@ -13,6 +13,8 @@ on:
required: true
default: false
type: boolean
schedule:
- cron: '14 0 * * *'
pull_request:
paths:
- "clients/**"
@@ -62,7 +64,9 @@ jobs:
- name: Set CARGO_FEATURES
run: |
echo 'CARGO_FEATURES=--features wireguard' >> $GITHUB_ENV
if: github.event_name == 'workflow_dispatch' && inputs.enable_wireguard == true
if: >
github.event_name == 'schedule' ||
(github.event_name == 'workflow_dispatch' && inputs.enable_wireguard == true)
- name: Install Rust stable
uses: actions-rs/toolchain@v1
@@ -103,10 +107,10 @@ jobs:
target/release/nym-cli
retention-days: 30
# If this was a pull_request, upload to build server
# If this was a pull_request or nightly, upload to build server
- name: Prepare build output
if: github.event_name == 'pull_request'
# if: github.event_name == 'schedule' || github.event_name == 'pull_request'
shell: bash
env:
OUTPUT_DIR: ci-builds/${{ github.ref_name }}
@@ -123,7 +127,7 @@ jobs:
cp target/debian/*.deb $OUTPUT_DIR
- name: Deploy branch to CI www
if: github.event_name == 'pull_request'
# if: github.event_name == 'schedule' || github.event_name == 'pull_request'
continue-on-error: true
uses: easingthemes/ssh-deploy@main
env:
+1 -1
View File
@@ -1,4 +1,4 @@
name: Publish Nym binaries
name: publish-nym-binaries
on:
workflow_dispatch:
@@ -1,4 +1,4 @@
name: Publish Nym Connect - desktop (MacOS)
name: publish-nym-connect-macos
on:
workflow_dispatch:
release:
@@ -1,4 +1,4 @@
name: Publish Nym Connect - desktop (Ubuntu)
name: publish-nym-connect-ubuntu
on:
workflow_dispatch:
release:
@@ -1,4 +1,4 @@
name: Publish Nym Connect - desktop (Windows 10)
name: publish-nym-connect-win10
on:
workflow_dispatch:
release:
+1 -1
View File
@@ -1,4 +1,4 @@
name: Build release of Nym smart contracts
name: publish-nym-contracts
on:
workflow_dispatch:
release:
@@ -1,4 +1,4 @@
name: Publish Nym Wallet (MacOS)
name: publish-nym-wallet-macos
on:
workflow_dispatch:
release:
@@ -1,4 +1,4 @@
name: Publish Nym Wallet (Ubuntu)
name: publish-nym-wallet-ubuntu
on:
workflow_dispatch:
release:
@@ -1,4 +1,4 @@
name: Publish Nym Wallet (Windows 10)
name: publish-nym-wallet-win10
on:
workflow_dispatch:
release:
+1 -1
View File
@@ -1,4 +1,4 @@
name: Publish Typescript SDK
name: publish-sdk-npm
on:
workflow_dispatch:
+1 -1
View File
@@ -1,4 +1,4 @@
name: Releases - calculate file hashes
name: release-calculate-hash
on:
workflow_call:
Generated
+2
View File
@@ -6732,6 +6732,7 @@ version = "0.1.0"
dependencies = [
"bincode",
"bytes",
"nym-bin-common",
"nym-sphinx",
"rand 0.8.5",
"serde",
@@ -6776,6 +6777,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-tun",
"tokio-util",
"url",
]
@@ -52,6 +52,7 @@ use nym_topology::provider_trait::TopologyProvider;
use nym_topology::HardcodedTopologyProvider;
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
use std::fmt::Debug;
use std::os::fd::RawFd;
use std::path::Path;
use std::sync::Arc;
use url::Url;
@@ -683,6 +684,7 @@ where
packet_stats_reporter.clone(),
);
let gateway_fd = gateway_transceiver.ws_fd();
// The message_sender is the transmitter for any component generating sphinx packets
// that are to be sent to the mixnet. They are used by cover traffic stream and real
// traffic stream.
@@ -755,6 +757,7 @@ where
received_buffer_request_sender,
},
},
gateway_fd,
client_state: ClientState {
shared_lane_queue_lengths,
reply_controller_sender,
@@ -770,6 +773,7 @@ pub struct BaseClient {
pub client_input: ClientInputStatus,
pub client_output: ClientOutputStatus,
pub client_state: ClientState,
pub gateway_fd: Option<RawFd>,
pub task_handle: TaskHandle,
}
@@ -8,6 +8,7 @@ use nym_gateway_client::GatewayClient;
pub use nym_gateway_client::{GatewayPacketRouter, PacketRouter};
use nym_sphinx::forwarding::packet::MixPacket;
use std::fmt::Debug;
use std::os::fd::RawFd;
use thiserror::Error;
#[cfg(not(target_arch = "wasm32"))]
@@ -25,6 +26,7 @@ fn erase_err<E: std::error::Error + Send + Sync + 'static>(err: E) -> ErasedGate
/// This combines combines the functionalities of being able to send and receive mix packets.
pub trait GatewayTransceiver: GatewaySender + GatewayReceiver {
fn gateway_identity(&self) -> identity::PublicKey;
fn ws_fd(&self) -> Option<RawFd>;
}
/// This trait defines the functionality of sending `MixPacket` into the mixnet,
@@ -66,6 +68,9 @@ impl<G: GatewayTransceiver + ?Sized + Send> GatewayTransceiver for Box<G> {
fn gateway_identity(&self) -> identity::PublicKey {
(**self).gateway_identity()
}
fn ws_fd(&self) -> Option<RawFd> {
(**self).ws_fd()
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -112,6 +117,9 @@ where
fn gateway_identity(&self) -> identity::PublicKey {
self.gateway_client.gateway_identity()
}
fn ws_fd(&self) -> Option<RawFd> {
self.gateway_client.ws_fd()
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -187,6 +195,9 @@ mod nonwasm_sealed {
fn gateway_identity(&self) -> identity::PublicKey {
self.local_identity
}
fn ws_fd(&self) -> Option<RawFd> {
None
}
}
#[async_trait]
@@ -259,4 +270,7 @@ impl GatewayTransceiver for MockGateway {
fn gateway_identity(&self) -> identity::PublicKey {
self.dummy_identity
}
fn ws_fd(&self) -> Option<RawFd> {
None
}
}
@@ -266,11 +266,11 @@ impl std::ops::Div<f64> for PacketRates {
impl PacketRates {
fn summary(&self) -> String {
format!(
"rx: {}/s (real: {}/s), tx: {}/s (real: {}/s)",
bibytes2(self.real_packets_received_size + self.cover_packets_received_size),
"down: {}/s, up: {}/s (cover down: {}/s, cover up: {}/s)",
bibytes2(self.real_packets_received_size),
bibytes2(self.real_packets_sent_size + self.cover_packets_sent_size),
bibytes2(self.real_packets_sent_size),
bibytes2(self.cover_packets_received_size),
bibytes2(self.cover_packets_sent_size),
)
}
@@ -288,6 +288,7 @@ impl PacketRates {
}
}
#[derive(Debug)]
pub(crate) enum PacketStatisticsEvent {
// The real packets sent. Recall that acks are sent by the gateway, so it's not included here.
RealPacketSent(usize),
@@ -443,7 +444,11 @@ impl PacketStatisticsControl {
// Check what the number of retransmissions was during the recording window
if let Some((_, start_stats)) = self.history.front() {
let delta = self.stats.clone() - start_stats.clone();
log::info!("retransmissions: {}", delta.retransmissions_queued,);
log::info!(
"mix packet retransmissions/real mix packets: {}/{}",
delta.retransmissions_queued,
delta.real_packets_queued,
);
} else {
log::warn!("Unable to check retransmissions during recording window");
}
@@ -26,6 +26,8 @@ use nym_task::TaskClient;
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
use rand::rngs::OsRng;
use std::convert::TryFrom;
use std::os::fd::AsRawFd;
use std::os::fd::RawFd;
use std::sync::Arc;
use std::time::Duration;
use tungstenite::protocol::Message;
@@ -34,6 +36,7 @@ use tungstenite::protocol::Message;
use tokio::time::sleep;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::connect_async;
use tokio_tungstenite::MaybeTlsStream;
#[cfg(target_arch = "wasm32")]
use wasm_utils::websocket::JSWebsocket;
@@ -146,6 +149,21 @@ impl<C, St> GatewayClient<C, St> {
self.gateway_identity
}
pub fn ws_fd(&self) -> Option<RawFd> {
match &self.connection {
SocketState::Available(conn) => match conn.get_ref() {
MaybeTlsStream::Plain(stream) => Some(stream.as_raw_fd()),
MaybeTlsStream::NativeTls(stream) => Some(stream.as_raw_fd()),
&_ => None,
},
SocketState::PartiallyDelegated(conn) => Some(conn.ws_fd()),
_ => {
log::warn!("No fd yet");
None
}
}
}
pub fn remaining_bandwidth(&self) -> i64 {
self.bandwidth_remaining
}
@@ -11,6 +11,7 @@ use futures::{SinkExt, StreamExt};
use log::*;
use nym_gateway_requests::registration::handshake::SharedKeys;
use nym_task::TaskClient;
use std::os::fd::{AsRawFd, RawFd};
use std::sync::Arc;
use tungstenite::Message;
@@ -38,11 +39,15 @@ type WsConn = JSWebsocket;
type SplitStreamReceiver = oneshot::Receiver<Result<SplitStream<WsConn>, GatewayClientError>>;
pub(crate) struct PartiallyDelegated {
ws_fd: RawFd,
sink_half: SplitSink<WsConn, Message>,
delegated_stream: (SplitStreamReceiver, oneshot::Sender<()>),
}
impl PartiallyDelegated {
pub fn ws_fd(&self) -> RawFd {
self.ws_fd
}
fn recover_received_plaintexts(ws_msgs: Vec<Message>, shared_key: &SharedKeys) -> Vec<Vec<u8>> {
let mut plaintexts = Vec::with_capacity(ws_msgs.len());
for ws_msg in ws_msgs {
@@ -92,6 +97,11 @@ impl PartiallyDelegated {
let (notify_sender, notify_receiver) = oneshot::channel();
let (stream_sender, stream_receiver) = oneshot::channel();
let ws_fd = match conn.get_ref() {
MaybeTlsStream::Plain(stream) => stream.as_raw_fd(),
MaybeTlsStream::NativeTls(stream) => stream.as_raw_fd(),
_ => 0.into(),
};
let (sink, mut stream) = conn.split();
let mixnet_receiver_future = async move {
@@ -141,6 +151,7 @@ impl PartiallyDelegated {
tokio::spawn(mixnet_receiver_future);
PartiallyDelegated {
ws_fd,
sink_half: sink,
delegated_stream: (stream_receiver, notify_sender),
}
@@ -16,7 +16,6 @@ use crate::signing::tx_signer::TxSigner;
use crate::signing::AccountData;
use crate::{DirectSigningReqwestRpcNyxdClient, QueryReqwestRpcNyxdClient, ReqwestRpcClient};
use async_trait::async_trait;
use cosmrs::cosmwasm;
use cosmrs::tendermint::{abci, evidence::Evidence, Genesis};
use cosmrs::tx::{Raw, SignDoc};
use cosmwasm_std::Addr;
@@ -40,7 +39,7 @@ pub use crate::rpc::TendermintRpcClient;
pub use coin::Coin;
pub use cosmrs::{
bank::MsgSend,
bip32,
bip32, cosmwasm,
crypto::PublicKey,
query::{PageRequest, PageResponse},
tendermint::{
+1
View File
@@ -11,6 +11,7 @@ license.workspace = true
[dependencies]
bincode = "1.3.3"
bytes = "1.5.0"
nym-bin-common = { path = "../bin-common" }
nym-sphinx = { path = "../nymsphinx" }
rand = "0.8.5"
serde = { workspace = true, features = ["derive"] }
+1 -1
View File
@@ -2,7 +2,7 @@ pub mod codec;
pub mod request;
pub mod response;
pub const CURRENT_VERSION: u8 = 2;
pub const CURRENT_VERSION: u8 = 3;
fn make_bincode_serializer() -> impl bincode::Options {
use bincode::Options;
+55 -7
View File
@@ -23,6 +23,7 @@ impl IpPacketRequest {
reply_to: Recipient,
reply_to_hops: Option<u8>,
reply_to_avg_mix_delays: Option<f64>,
buffer_timeout: Option<u64>,
) -> (Self, u64) {
let request_id = generate_random();
(
@@ -34,6 +35,7 @@ impl IpPacketRequest {
reply_to,
reply_to_hops,
reply_to_avg_mix_delays,
buffer_timeout,
}),
},
request_id,
@@ -44,6 +46,7 @@ impl IpPacketRequest {
reply_to: Recipient,
reply_to_hops: Option<u8>,
reply_to_avg_mix_delays: Option<f64>,
buffer_timeout: Option<u64>,
) -> (Self, u64) {
let request_id = generate_random();
(
@@ -54,6 +57,7 @@ impl IpPacketRequest {
reply_to,
reply_to_hops,
reply_to_avg_mix_delays,
buffer_timeout,
}),
},
request_id,
@@ -74,10 +78,10 @@ impl IpPacketRequest {
)
}
pub fn new_ip_packet(ip_packet: bytes::Bytes) -> Self {
pub fn new_data_request(ip_packets: bytes::Bytes) -> Self {
Self {
version: CURRENT_VERSION,
data: IpPacketRequestData::Data(DataRequest { ip_packet }),
data: IpPacketRequestData::Data(DataRequest { ip_packets }),
}
}
@@ -87,6 +91,8 @@ impl IpPacketRequest {
IpPacketRequestData::DynamicConnect(request) => Some(request.request_id),
IpPacketRequestData::Disconnect(request) => Some(request.request_id),
IpPacketRequestData::Data(_) => None,
IpPacketRequestData::Ping(request) => Some(request.request_id),
IpPacketRequestData::Health(request) => Some(request.request_id),
}
}
@@ -96,6 +102,8 @@ impl IpPacketRequest {
IpPacketRequestData::DynamicConnect(request) => Some(&request.reply_to),
IpPacketRequestData::Disconnect(request) => Some(&request.reply_to),
IpPacketRequestData::Data(_) => None,
IpPacketRequestData::Ping(request) => Some(&request.reply_to),
IpPacketRequestData::Health(request) => Some(&request.reply_to),
}
}
@@ -119,35 +127,58 @@ pub enum IpPacketRequestData {
DynamicConnect(DynamicConnectRequest),
Disconnect(DisconnectRequest),
Data(DataRequest),
Ping(PingRequest),
Health(HealthRequest),
}
// A static connect request is when the client provides the internal IP address it will use on the
// ip packet router.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct StaticConnectRequest {
pub request_id: u64,
pub ip: IpAddr,
// The nym-address the response should be sent back to
pub reply_to: Recipient,
// The number of mix node hops that responses should take, in addition to the entry and exit
// node. Zero means only client -> entry -> exit -> client.
pub reply_to_hops: Option<u8>,
// The average delay at each mix node, in milliseconds. Currently this is not supported by the
// ip packet router.
pub reply_to_avg_mix_delays: Option<f64>,
// The maximum time in milliseconds the IPR should wait when filling up a mix packet
// with ip packets.
pub buffer_timeout: Option<u64>,
}
// A dynamic connect request is when the client does not provide the internal IP address it will use
// on the ip packet router, and instead requests one to be assigned to it.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct DynamicConnectRequest {
pub request_id: u64,
// The nym-address the response should be sent back to
pub reply_to: Recipient,
// The number of mix node hops that responses should take, in addition to the entry and exit
// node. Zero means only client -> entry -> exit -> client.
pub reply_to_hops: Option<u8>,
// The average delay at each mix node, in milliseconds. Currently this is not supported by the
// ip packet router.
pub reply_to_avg_mix_delays: Option<f64>,
// The maximum time in milliseconds the IPR should wait when filling up a mix packet
// with ip packets.
pub buffer_timeout: Option<u64>,
}
// A disconnect request is when the client wants to disconnect from the ip packet router and free
// up the allocated IP address.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct DisconnectRequest {
pub request_id: u64,
@@ -155,9 +186,25 @@ pub struct DisconnectRequest {
pub reply_to: Recipient,
}
// A data request is when the client wants to send an IP packet to a destination.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct DataRequest {
pub ip_packet: bytes::Bytes,
pub ip_packets: bytes::Bytes,
}
// A ping request is when the client wants to check if the ip packet router is still alive.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct PingRequest {
pub request_id: u64,
// The nym-address the response should be sent back to
pub reply_to: Recipient,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct HealthRequest {
pub request_id: u64,
// The nym-address the response should be sent back to
pub reply_to: Recipient,
}
#[cfg(test)]
@@ -175,10 +222,11 @@ mod tests {
reply_to: Recipient::try_from_base58_string("D1rrpsysCGCYXy9saP8y3kmNpGtJZUXN9SvFoUcqAsM9.9Ssso1ea5NfkbMASdiseDSjTN1fSWda5SgEVjdSN4CvV@GJqd3ZxpXWSNxTfx7B1pPtswpetH4LnJdFeLeuY5KUuN").unwrap(),
reply_to_hops: None,
reply_to_avg_mix_delays: None,
buffer_timeout: None,
},
)
};
assert_eq!(connect.to_bytes().unwrap().len(), 107);
assert_eq!(connect.to_bytes().unwrap().len(), 108);
}
#[test]
@@ -186,7 +234,7 @@ mod tests {
let data = IpPacketRequest {
version: 4,
data: IpPacketRequestData::Data(DataRequest {
ip_packet: bytes::Bytes::from(vec![1u8; 32]),
ip_packets: bytes::Bytes::from(vec![1u8; 32]),
}),
};
assert_eq!(data.to_bytes().unwrap().len(), 35);
@@ -197,7 +245,7 @@ mod tests {
let data = IpPacketRequest {
version: 4,
data: IpPacketRequestData::Data(DataRequest {
ip_packet: bytes::Bytes::from(vec![1, 2, 4, 2, 5]),
ip_packets: bytes::Bytes::from(vec![1, 2, 4, 2, 5]),
}),
};
@@ -214,7 +262,7 @@ mod tests {
assert_eq!(
deserialized.data,
IpPacketRequestData::Data(DataRequest {
ip_packet: bytes::Bytes::from(vec![1, 2, 4, 2, 5]),
ip_packets: bytes::Bytes::from(vec![1, 2, 4, 2, 5]),
})
);
}
+100
View File
@@ -64,6 +64,45 @@ impl IpPacketResponse {
}
}
pub fn new_disconnect_success(request_id: u64, reply_to: Recipient) -> Self {
Self {
version: CURRENT_VERSION,
data: IpPacketResponseData::Disconnect(DisconnectResponse {
request_id,
reply_to,
reply: DisconnectResponseReply::Success,
}),
}
}
pub fn new_disconnect_failure(
request_id: u64,
reply_to: Recipient,
reason: DisconnectFailureReason,
) -> Self {
Self {
version: CURRENT_VERSION,
data: IpPacketResponseData::Disconnect(DisconnectResponse {
request_id,
reply_to,
reply: DisconnectResponseReply::Failure(reason),
}),
}
}
pub fn new_unrequested_disconnect(
reply_to: Recipient,
reason: UnrequestedDisconnectReason,
) -> Self {
Self {
version: CURRENT_VERSION,
data: IpPacketResponseData::UnrequestedDisconnect(UnrequestedDisconnect {
reply_to,
reason,
}),
}
}
pub fn new_ip_packet(ip_packet: bytes::Bytes) -> Self {
Self {
version: CURRENT_VERSION,
@@ -106,7 +145,10 @@ impl IpPacketResponse {
IpPacketResponseData::StaticConnect(response) => Some(response.request_id),
IpPacketResponseData::DynamicConnect(response) => Some(response.request_id),
IpPacketResponseData::Disconnect(response) => Some(response.request_id),
IpPacketResponseData::UnrequestedDisconnect(_) => None,
IpPacketResponseData::Data(_) => None,
IpPacketResponseData::Pong(response) => Some(response.request_id),
IpPacketResponseData::Health(response) => Some(response.request_id),
IpPacketResponseData::Error(response) => Some(response.request_id),
}
}
@@ -116,7 +158,10 @@ impl IpPacketResponse {
IpPacketResponseData::StaticConnect(response) => Some(&response.reply_to),
IpPacketResponseData::DynamicConnect(response) => Some(&response.reply_to),
IpPacketResponseData::Disconnect(response) => Some(&response.reply_to),
IpPacketResponseData::UnrequestedDisconnect(response) => Some(&response.reply_to),
IpPacketResponseData::Data(_) => None,
IpPacketResponseData::Pong(response) => Some(&response.reply_to),
IpPacketResponseData::Health(response) => Some(&response.reply_to),
IpPacketResponseData::Error(response) => Some(&response.reply_to),
}
}
@@ -137,10 +182,28 @@ impl IpPacketResponse {
#[allow(clippy::large_enum_variant)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum IpPacketResponseData {
// Response for a static connect request
StaticConnect(StaticConnectResponse),
// Response for a dynamic connect request
DynamicConnect(DynamicConnectResponse),
// Response for a disconnect initiqated by the client
Disconnect(DisconnectResponse),
// Message from the server that the client got disconnected without the client initiating it
UnrequestedDisconnect(UnrequestedDisconnect),
// Response to a data request
Data(DataResponse),
// Response to ping request
Pong(PongResponse),
// Response for a health request
Health(HealthResponse),
// Error response
Error(ErrorResponse),
}
@@ -234,11 +297,48 @@ pub enum DisconnectFailureReason {
Other(String),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct UnrequestedDisconnect {
pub reply_to: Recipient,
pub reason: UnrequestedDisconnectReason,
}
#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)]
pub enum UnrequestedDisconnectReason {
#[error("client mixnet traffic timeout")]
ClientMixnetTrafficTimeout,
#[error("client tun traffic timeout")]
ClientTunTrafficTimeout,
#[error("{0}")]
Other(String),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DataResponse {
pub ip_packet: bytes::Bytes,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PongResponse {
pub request_id: u64,
pub reply_to: Recipient,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct HealthResponse {
pub request_id: u64,
pub reply_to: Recipient,
pub reply: HealthResponseReply,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct HealthResponseReply {
// Return the binary build information of the IPR
pub build_info: nym_bin_common::build_information::BinaryBuildInformationOwned,
// Return if the IPR has performed a successful routing test.
pub routable: Option<bool>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ErrorResponse {
pub request_id: u64,
@@ -6,7 +6,7 @@ use log::*;
use nym_coconut_interface::{Credential, VerificationKey};
use nym_validator_client::coconut::all_coconut_api_clients;
use nym_validator_client::nym_api::EpochId;
use nym_validator_client::nyxd::contract_traits::MultisigQueryClient;
use nym_validator_client::nyxd::contract_traits::{MultisigQueryClient, NymContractsProvider};
use nym_validator_client::nyxd::AccountId;
use nym_validator_client::{
nyxd::{
@@ -42,8 +42,33 @@ impl CoconutVerifier {
let mut master_keys = HashMap::new();
let mut api_clients = HashMap::new();
// don't make it a hard failure in case we're running on mainnet (where DKG hasn't been deployed yet)
if nyxd_client.dkg_contract_address().is_none() {
error!(
"DKG contract address is not available - no coconut credentials will be redeemable"
);
return Ok(CoconutVerifier {
address,
nyxd_client: RwLock::new(nyxd_client),
api_clients: Default::default(),
master_keys: Default::default(),
mix_denom_base,
});
}
let Ok(current_epoch) = nyxd_client.get_current_epoch().await else {
// another case of somebody putting a placeholder address that doesn't exist
error!("the specified DKG contract address is invalid - no coconut credentials will be redeemable");
return Ok(CoconutVerifier {
address,
nyxd_client: RwLock::new(nyxd_client),
api_clients: Default::default(),
master_keys: Default::default(),
mix_denom_base,
});
};
// might as well obtain the key for the current epoch, if applicable
let current_epoch = nyxd_client.get_current_epoch().await?;
if current_epoch.state.is_in_progress() {
// note: even though we're constructing clients here, we will NOT be making any network requests
let epoch_api_clients =
+15
View File
@@ -958,6 +958,12 @@ version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f"
[[package]]
name = "const-str"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aca749d3d3f5b87a0d6100509879f9cf486ab510803a4a4e1001da1ff61c2bd6"
[[package]]
name = "constant_time_eq"
version = "0.3.0"
@@ -3722,6 +3728,7 @@ dependencies = [
"clap",
"clap_complete",
"clap_complete_fig",
"const-str",
"log",
"pretty_env_logger",
"schemars",
@@ -3762,6 +3769,7 @@ dependencies = [
"serde",
"serde_json",
"sha2 0.10.8",
"si-scale",
"sqlx",
"tap",
"thiserror",
@@ -4138,6 +4146,7 @@ dependencies = [
"cfg-if",
"dotenvy",
"hex-literal",
"log",
"once_cell",
"schemars",
"serde",
@@ -6221,6 +6230,12 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "si-scale"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44beb68bf488343b13ddbd74d1d5d5e6559a58b6dfaee74eb8d5ed4f7ed7666f"
[[package]]
name = "signal-hook"
version = "0.3.17"
+1
View File
@@ -714,6 +714,7 @@ where
client_state,
reconstructed_receiver,
started_client.task_handle,
started_client.gateway_fd,
None,
))
}
@@ -16,6 +16,7 @@ use nym_task::{
TaskHandle,
};
use nym_topology::NymTopology;
use std::os::fd::RawFd;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -44,6 +45,8 @@ pub struct MixnetClient {
pub(crate) task_handle: TaskHandle,
pub(crate) packet_type: Option<PacketType>,
pub(crate) gateway_fd: Option<RawFd>,
// internal state used for the `Stream` implementation
_buffered: Vec<ReconstructedMessage>,
}
@@ -56,6 +59,7 @@ impl MixnetClient {
client_state: ClientState,
reconstructed_receiver: ReconstructedMessagesReceiver,
task_handle: TaskHandle,
gateway_fd: Option<RawFd>,
packet_type: Option<PacketType>,
) -> Self {
Self {
@@ -65,6 +69,7 @@ impl MixnetClient {
client_state,
reconstructed_receiver,
task_handle,
gateway_fd,
packet_type,
_buffered: Vec::new(),
}
@@ -97,6 +102,10 @@ impl MixnetClient {
&self.nym_address
}
pub fn gateway_fd(&self) -> Option<RawFd> {
self.gateway_fd
}
/// Get a shallow clone of [`MixnetClientSender`]. Useful if you want split the send and
/// receive logic in different locations.
pub fn split_sender(&self) -> MixnetClientSender {
@@ -40,6 +40,7 @@ serde_json = { workspace = true }
tap.workspace = true
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "net", "io-util"] }
tokio-util = { workspace = true, features = ["codec"] }
url.workspace = true
[target.'cfg(target_os = "linux")'.dependencies]
@@ -0,0 +1,132 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use bytes::Bytes;
use nym_ip_packet_requests::{codec::MultiIpPacketCodec, response::IpPacketResponse};
use nym_sdk::mixnet::{MixnetMessageSender, Recipient};
use crate::{
constants::CLIENT_HANDLER_ACTIVITY_TIMEOUT,
error::{IpPacketRouterError, Result},
util::create_message::create_input_message,
};
// Data flow
// Out: mixnet_listener -> decode -> handle_packet -> write_to_tun
// In: tun_listener -> [connected_client_handler -> encode] -> mixnet_sender
// This handler is spawned as a task, and it listens to IP packets passed from the tun_listener,
// encodes it, and then sends to mixnet.
pub(crate) struct ConnectedClientHandler {
nym_address: Recipient,
mix_hops: Option<u8>,
forward_from_tun_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender,
close_rx: tokio::sync::oneshot::Receiver<()>,
activity_timeout: tokio::time::Interval,
encoder: MultiIpPacketCodec,
}
impl ConnectedClientHandler {
pub(crate) fn start(
reply_to: Recipient,
reply_to_hops: Option<u8>,
buffer_timeout: std::time::Duration,
mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender,
) -> (
tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
tokio::sync::oneshot::Sender<()>,
tokio::task::JoinHandle<()>,
) {
let (close_tx, close_rx) = tokio::sync::oneshot::channel();
let (forward_from_tun_tx, forward_from_tun_rx) = tokio::sync::mpsc::unbounded_channel();
// Reset so that we don't get the first tick immediately
let mut activity_timeout = tokio::time::interval(CLIENT_HANDLER_ACTIVITY_TIMEOUT);
activity_timeout.reset();
let encoder = MultiIpPacketCodec::new(buffer_timeout);
let connected_client_handler = ConnectedClientHandler {
nym_address: reply_to,
mix_hops: reply_to_hops,
forward_from_tun_rx,
mixnet_client_sender,
close_rx,
activity_timeout,
encoder,
};
let handle = tokio::spawn(async move {
if let Err(err) = connected_client_handler.run().await {
log::error!("connected client handler has failed: {err}")
}
});
(forward_from_tun_tx, close_tx, handle)
}
async fn send_packets_to_mixnet(&mut self, packets: Bytes) -> Result<()> {
let response_packet = IpPacketResponse::new_ip_packet(packets)
.to_bytes()
.map_err(|err| IpPacketRouterError::FailedToSerializeResponsePacket { source: err })?;
let input_message = create_input_message(self.nym_address, response_packet, self.mix_hops);
self.mixnet_client_sender
.send(input_message)
.await
.map_err(|err| IpPacketRouterError::FailedToSendPacketToMixnet { source: err })
}
async fn handle_buffer_timeout(&mut self, packets: Bytes) -> Result<()> {
if !packets.is_empty() {
self.send_packets_to_mixnet(packets).await
} else {
Ok(())
}
}
async fn handle_packet(&mut self, packet: Vec<u8>) -> Result<()> {
self.activity_timeout.reset();
if let Some(bundled_packets) = self.encoder.append_packet(packet.into()) {
self.send_packets_to_mixnet(bundled_packets).await
} else {
Ok(())
}
}
async fn run(mut self) -> Result<()> {
loop {
tokio::select! {
_ = &mut self.close_rx => {
log::info!("client handler stopping: received close: {}", self.nym_address);
break;
},
_ = self.activity_timeout.tick() => {
log::info!("client handler stopping: activity timeout: {}", self.nym_address);
break;
},
Some(packets) = self.encoder.buffer_timeout() => {
if let Err(err) = self.handle_buffer_timeout(packets).await {
log::error!("client handler: failed to handle buffer timeout: {err}");
}
},
packet = self.forward_from_tun_rx.recv() => match packet {
Some(packet) => {
if let Err(err) = self.handle_packet(packet).await {
log::error!("client handler: failed to handle packet: {err}");
}
},
None => {
log::info!("client handler stopping: tun channel closed");
break;
}
},
}
}
log::debug!("ConnectedClientHandler: exiting");
Ok(())
}
}
@@ -5,5 +5,12 @@ pub const TUN_BASE_NAME: &str = "nymtun";
pub const TUN_DEVICE_ADDRESS: &str = "10.0.0.1";
pub const TUN_DEVICE_NETMASK: &str = "255.255.255.0";
// We routinely check if any clients needs to be disconnected at this interval
pub(crate) const DISCONNECT_TIMER_INTERVAL: Duration = Duration::from_secs(10);
pub(crate) const CLIENT_INACTIVITY_TIMEOUT: Duration = Duration::from_secs(5 * 60);
// We consider a client inactive if it hasn't sent any mixnet packets in this duration
pub(crate) const CLIENT_MIXNET_INACTIVITY_TIMEOUT: Duration = Duration::from_secs(5 * 60);
// We consider a client handler inactive if it hasn't received any packets from the tun device in
// this duration
pub(crate) const CLIENT_HANDLER_ACTIVITY_TIMEOUT: Duration = Duration::from_secs(10 * 60);
@@ -142,11 +142,9 @@ impl IpPacketRouter {
// Channel used by the IpPacketRouter to signal connected and disconnected clients to the
// TunListener
let (connected_clients, connected_clients_rx) = mixnet_listener::ConnectedClients::new();
// let (connected_client_tx, connected_client_rx) = tokio::sync::mpsc::unbounded_channel();
let tun_listener = tun_listener::TunListener {
tun_reader,
mixnet_client_sender: mixnet_client.split_sender(),
task_client: task_handle.get_handle(),
connected_clients: connected_clients_rx,
};
@@ -162,7 +160,6 @@ impl IpPacketRouter {
mixnet_client,
task_handle,
connected_clients,
// connected_client_tx,
};
log::info!("The address of this client is: {self_address}");
@@ -5,6 +5,7 @@ pub use crate::config::Config;
pub use ip_packet_router::{IpPacketRouter, OnStartData};
pub mod config;
mod connected_client_handler;
mod constants;
pub mod error;
mod ip_packet_router;
@@ -3,6 +3,8 @@ mod cli;
#[cfg(target_os = "linux")]
mod config;
#[cfg(target_os = "linux")]
mod connected_client_handler;
#[cfg(target_os = "linux")]
mod constants;
#[cfg(target_os = "linux")]
mod error;
@@ -3,8 +3,10 @@ use std::{
net::{IpAddr, SocketAddr},
};
use bytes::{Bytes, BytesMut};
use futures::StreamExt;
use nym_ip_packet_requests::{
codec::MultiIpPacketCodec,
request::{IpPacketRequest, IpPacketRequestData},
response::{
DynamicConnectFailureReason, ErrorResponseReply, IpPacketResponse,
@@ -14,14 +16,18 @@ use nym_ip_packet_requests::{
use nym_sdk::mixnet::{MixnetMessageSender, Recipient};
use nym_sphinx::receiver::ReconstructedMessage;
use nym_task::TaskHandle;
use tap::TapFallible;
#[cfg(target_os = "linux")]
use tokio::io::AsyncWriteExt;
use tokio_util::codec::Decoder;
use crate::{
config::Config,
constants::{CLIENT_INACTIVITY_TIMEOUT, DISCONNECT_TIMER_INTERVAL},
connected_client_handler,
constants::{CLIENT_MIXNET_INACTIVITY_TIMEOUT, DISCONNECT_TIMER_INTERVAL},
error::{IpPacketRouterError, Result},
request_filter::{self},
tun_listener,
util::generate_new_ip,
util::{
create_message::create_input_message,
@@ -29,69 +35,23 @@ use crate::{
},
};
#[cfg(target_os = "linux")]
pub(crate) struct MixnetListener {
pub(crate) _config: Config,
pub(crate) request_filter: request_filter::RequestFilter,
pub(crate) tun_writer: tokio::io::WriteHalf<tokio_tun::Tun>,
pub(crate) mixnet_client: nym_sdk::mixnet::MixnetClient,
pub(crate) task_handle: TaskHandle,
pub(crate) connected_clients: ConnectedClients,
}
pub(crate) struct ConnectedClients {
// The set of connected clients
clients: HashMap<IpAddr, ConnectedClient>,
connected_client_tx: tokio::sync::mpsc::UnboundedSender<ConnectedClientEvent>,
}
pub(crate) struct ConnectedClientsListener {
clients: HashMap<IpAddr, ConnectedClient>,
pub(crate) connected_client_rx: tokio::sync::mpsc::UnboundedReceiver<ConnectedClientEvent>,
}
impl ConnectedClientsListener {
pub(crate) fn get(&self, ip: &IpAddr) -> Option<&ConnectedClient> {
self.clients.get(ip)
}
pub(crate) fn update(&mut self, event: ConnectedClientEvent) {
match event {
ConnectedClientEvent::Connect(connected_event) => {
let ConnectEvent {
ip,
nym_address,
mix_hops,
} = *connected_event;
log::trace!("Connect client: {ip}");
self.clients.insert(
ip,
ConnectedClient {
nym_address,
mix_hops,
last_activity: std::time::Instant::now(),
},
);
}
ConnectedClientEvent::Disconnect(DisconnectEvent(ip)) => {
log::trace!("Disconnect client: {ip}");
self.clients.remove(&ip);
}
}
}
// Notify the tun listener when a new client connects or disconnects
tun_listener_connected_client_tx: tokio::sync::mpsc::UnboundedSender<ConnectedClientEvent>,
}
impl ConnectedClients {
pub(crate) fn new() -> (Self, ConnectedClientsListener) {
pub(crate) fn new() -> (Self, tun_listener::ConnectedClientsListener) {
let (connected_client_tx, connected_client_rx) = tokio::sync::mpsc::unbounded_channel();
(
Self {
clients: Default::default(),
connected_client_tx,
},
ConnectedClientsListener {
clients: Default::default(),
connected_client_rx,
tun_listener_connected_client_tx: connected_client_tx,
},
tun_listener::ConnectedClientsListener::new(connected_client_rx),
)
}
@@ -125,22 +85,38 @@ impl ConnectedClients {
.find(|client| client.nym_address == *nym_address)
}
fn connect(&mut self, ip: IpAddr, nym_address: Recipient, mix_hops: Option<u8>) {
fn connect(
&mut self,
ip: IpAddr,
nym_address: Recipient,
mix_hops: Option<u8>,
forward_from_tun_tx: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
close_tx: tokio::sync::oneshot::Sender<()>,
handle: tokio::task::JoinHandle<()>,
) {
// The map of connected clients that the mixnet listener keeps track of. It monitors
// activity and disconnects clients that have been inactive for too long.
self.clients.insert(
ip,
ConnectedClient {
nym_address,
mix_hops,
last_activity: std::time::Instant::now(),
close_tx: Some(close_tx),
handle,
},
);
self.connected_client_tx
// Send the connected client info to the tun listener, which will use it to forward packets
// to the connected client handler.
self.tun_listener_connected_client_tx
.send(ConnectedClientEvent::Connect(Box::new(ConnectEvent {
ip,
nym_address,
mix_hops,
forward_from_tun_tx,
})))
.unwrap();
.tap_err(|err| {
log::error!("Failed to send connected client event: {err}");
})
.ok();
}
fn update_activity(&mut self, ip: &IpAddr) -> Result<()> {
@@ -152,25 +128,57 @@ impl ConnectedClients {
}
}
fn disconnect_inactive_clients(&mut self) {
let now = std::time::Instant::now();
let inactive_clients: Vec<IpAddr> = self
.clients
.iter()
// Identify connected client handlers that have stopped without being told to stop
fn get_finished_client_handlers(&mut self) -> Vec<(IpAddr, Recipient)> {
self.clients
.iter_mut()
.filter_map(|(ip, client)| {
if now.duration_since(client.last_activity) > CLIENT_INACTIVITY_TIMEOUT {
Some(*ip)
if client.handle.is_finished() {
Some((*ip, client.nym_address))
} else {
None
}
})
.collect();
for ip in inactive_clients {
.collect()
}
fn get_inactive_clients(&mut self) -> Vec<(IpAddr, Recipient)> {
let now = std::time::Instant::now();
self.clients
.iter()
.filter_map(|(ip, client)| {
if now.duration_since(client.last_activity) > CLIENT_MIXNET_INACTIVITY_TIMEOUT {
Some((*ip, client.nym_address))
} else {
None
}
})
.collect()
}
fn disconnect_stopped_client_handlers(&mut self, stopped_clients: Vec<(IpAddr, Recipient)>) {
for (ip, _) in &stopped_clients {
log::info!("Disconnect stopped client: {ip}");
self.clients.remove(ip);
self.tun_listener_connected_client_tx
.send(ConnectedClientEvent::Disconnect(DisconnectEvent(*ip)))
.tap_err(|err| {
log::error!("Failed to send disconnect event: {err}");
})
.ok();
}
}
fn disconnect_inactive_clients(&mut self, inactive_clients: Vec<(IpAddr, Recipient)>) {
for (ip, _) in &inactive_clients {
log::info!("Disconnect inactive client: {ip}");
self.clients.remove(&ip);
self.connected_client_tx
.send(ConnectedClientEvent::Disconnect(DisconnectEvent(ip)))
.unwrap();
self.clients.remove(ip);
self.tun_listener_connected_client_tx
.send(ConnectedClientEvent::Disconnect(DisconnectEvent(*ip)))
.tap_err(|err| {
log::error!("Failed to send disconnect event: {err}");
})
.ok();
}
}
@@ -180,9 +188,22 @@ impl ConnectedClients {
}
pub(crate) struct ConnectedClient {
// The nym address of the connected client that we are communicating with on the other side of
// the mixnet
pub(crate) nym_address: Recipient,
// Number of mix node hops that the client has requested to use
pub(crate) mix_hops: Option<u8>,
// Keep track of last activity so we can disconnect inactive clients
pub(crate) last_activity: std::time::Instant,
// Send to connected clients listener to stop. This is option only because we need to take
// ownership of it when the client is dropped.
pub(crate) close_tx: Option<tokio::sync::oneshot::Sender<()>>,
// Handle for the connected client handler
pub(crate) handle: tokio::task::JoinHandle<()>,
}
impl ConnectedClient {
@@ -191,12 +212,47 @@ impl ConnectedClient {
}
}
impl Drop for ConnectedClient {
fn drop(&mut self) {
log::debug!("signal to close client: {}", self.nym_address);
if let Some(close_tx) = self.close_tx.take() {
close_tx.send(()).ok();
}
}
}
type PacketHandleResult = Result<Option<IpPacketResponse>>;
#[cfg(target_os = "linux")]
pub(crate) struct MixnetListener {
// The configuration for the mixnet listener
pub(crate) _config: Config,
// The request filter that we use to check if a packet should be forwarded
pub(crate) request_filter: request_filter::RequestFilter,
// The TUN device that we use to send and receive packets from the internet
pub(crate) tun_writer: tokio::io::WriteHalf<tokio_tun::Tun>,
// The mixnet client that we use to send and receive packets from the mixnet
pub(crate) mixnet_client: nym_sdk::mixnet::MixnetClient,
// The task handle for the main loop
pub(crate) task_handle: TaskHandle,
// The map of connected clients that the mixnet listener keeps track of. It monitors
// activity and disconnects clients that have been inactive for too long.
pub(crate) connected_clients: ConnectedClients,
}
#[cfg(target_os = "linux")]
impl MixnetListener {
// Receving a static connect request from a client with an IP provided that we assign to them,
// if it's available. If it's not available, we send a failure response.
async fn on_static_connect_request(
&mut self,
connect_request: nym_ip_packet_requests::request::StaticConnectRequest,
) -> Result<Option<IpPacketResponse>> {
) -> PacketHandleResult {
log::info!(
"Received static connect request from {sender_address}",
sender_address = connect_request.reply_to
@@ -206,6 +262,8 @@ impl MixnetListener {
let requested_ip = connect_request.ip;
let reply_to = connect_request.reply_to;
let reply_to_hops = connect_request.reply_to_hops;
// TODO: add to connect request
let buffer_timeout = nym_ip_packet_requests::codec::BUFFER_TIMEOUT;
// TODO: ignoring reply_to_avg_mix_delays for now
// Check that the IP is available in the set of connected clients
@@ -230,8 +288,25 @@ impl MixnetListener {
}
(false, false) => {
log::info!("Connecting a new client");
self.connected_clients
.connect(requested_ip, reply_to, reply_to_hops);
// Spawn the ConnectedClientHandler for the new client
let (forward_from_tun_tx, close_tx, handle) =
connected_client_handler::ConnectedClientHandler::start(
reply_to,
reply_to_hops,
buffer_timeout,
self.mixnet_client.split_sender(),
);
// Register the new client in the set of connected clients
self.connected_clients.connect(
requested_ip,
reply_to,
reply_to_hops,
forward_from_tun_tx,
close_tx,
handle,
);
Ok(Some(IpPacketResponse::new_static_connect_success(
request_id, reply_to,
)))
@@ -258,7 +333,7 @@ impl MixnetListener {
async fn on_dynamic_connect_request(
&mut self,
connect_request: nym_ip_packet_requests::request::DynamicConnectRequest,
) -> Result<Option<IpPacketResponse>> {
) -> PacketHandleResult {
log::info!(
"Received dynamic connect request from {sender_address}",
sender_address = connect_request.reply_to
@@ -267,6 +342,8 @@ impl MixnetListener {
let request_id = connect_request.request_id;
let reply_to = connect_request.reply_to;
let reply_to_hops = connect_request.reply_to_hops;
// TODO: add to connect request
let buffer_timeout = nym_ip_packet_requests::codec::BUFFER_TIMEOUT;
// TODO: ignoring reply_to_avg_mix_delays for now
// Check if it's the same client connecting again, then we just reuse the same IP
@@ -298,8 +375,24 @@ impl MixnetListener {
)));
};
self.connected_clients
.connect(new_ip, reply_to, reply_to_hops);
// Spawn the ConnectedClientHandler for the new client
let (forward_from_tun_tx, close_tx, handle) =
connected_client_handler::ConnectedClientHandler::start(
reply_to,
reply_to_hops,
buffer_timeout,
self.mixnet_client.split_sender(),
);
// Register the new client in the set of connected clients
self.connected_clients.connect(
new_ip,
reply_to,
reply_to_hops,
forward_from_tun_tx,
close_tx,
handle,
);
Ok(Some(IpPacketResponse::new_dynamic_connect_success(
request_id, reply_to, new_ip,
)))
@@ -308,15 +401,12 @@ impl MixnetListener {
fn on_disconnect_request(
&self,
_disconnect_request: nym_ip_packet_requests::request::DisconnectRequest,
) -> Result<Option<IpPacketResponse>> {
) -> PacketHandleResult {
log::info!("Received disconnect request: not implemented, dropping");
Ok(None)
}
async fn on_data_request(
&mut self,
data_request: nym_ip_packet_requests::request::DataRequest,
) -> Result<Option<IpPacketResponse>> {
async fn handle_packet(&mut self, ip_packet: &Bytes) -> PacketHandleResult {
log::trace!("Received data request");
// We don't forward packets that we are not able to parse. BUT, there might be a good
@@ -331,10 +421,10 @@ impl MixnetListener {
src_addr,
dst_addr,
dst,
} = parse_packet(&data_request.ip_packet)?;
} = parse_packet(ip_packet)?;
let dst_str = dst.map_or(dst_addr.to_string(), |dst| dst.to_string());
log::info!("Received packet: {packet_type}: {src_addr} -> {dst_str}");
log::debug!("Received packet: {packet_type}: {src_addr} -> {dst_str}");
if let Some(connected_client) = self.connected_clients.get_client_from_ip_mut(&src_addr) {
// Keep track of activity so we can disconnect inactive clients
@@ -347,7 +437,7 @@ impl MixnetListener {
if self.request_filter.check_address(&dst).await {
// Forward the packet to the TUN device where it will be routed out to the internet
self.tun_writer
.write_all(&data_request.ip_packet)
.write_all(ip_packet)
.await
.map_err(|_| IpPacketRouterError::FailedToWritePacketToTun)?;
Ok(None)
@@ -362,16 +452,31 @@ impl MixnetListener {
}
} else {
// If the client is not connected, just drop the packet silently
log::info!("Dropping packet: no connected client for {src_addr}");
log::info!("dropping packet from mixnet: no registered client for packet with source: {src_addr}");
Ok(None)
}
}
async fn on_data_request(
&mut self,
data_request: nym_ip_packet_requests::request::DataRequest,
) -> Result<Vec<PacketHandleResult>> {
let mut responses = Vec::new();
let mut decoder = MultiIpPacketCodec::new(nym_ip_packet_requests::codec::BUFFER_TIMEOUT);
let mut bytes = BytesMut::new();
bytes.extend_from_slice(&data_request.ip_packets);
while let Ok(Some(packet)) = decoder.decode(&mut bytes) {
let result = self.handle_packet(&packet).await;
responses.push(result);
}
Ok(responses)
}
fn on_version_mismatch(
&self,
version: u8,
reconstructed: &ReconstructedMessage,
) -> Result<Option<IpPacketResponse>> {
) -> PacketHandleResult {
// If it's possible to parse, do so and return back a response, otherwise just drop
let (id, recipient) = IpPacketRequest::from_reconstructed_message(reconstructed)
.ok()
@@ -393,7 +498,7 @@ impl MixnetListener {
async fn on_reconstructed_message(
&mut self,
reconstructed: ReconstructedMessage,
) -> Result<Option<IpPacketResponse>> {
) -> Result<Vec<PacketHandleResult>> {
log::debug!(
"Received message with sender_tag: {:?}",
reconstructed.sender_tag
@@ -405,7 +510,7 @@ impl MixnetListener {
// backwards compatible.
if *version != nym_ip_packet_requests::CURRENT_VERSION {
log::info!("Received packet with invalid version: v{version}");
return self.on_version_mismatch(*version, &reconstructed);
return Ok(vec![self.on_version_mismatch(*version, &reconstructed)]);
}
}
@@ -414,23 +519,49 @@ impl MixnetListener {
match request.data {
IpPacketRequestData::StaticConnect(connect_request) => {
self.on_static_connect_request(connect_request).await
Ok(vec![self.on_static_connect_request(connect_request).await])
}
IpPacketRequestData::DynamicConnect(connect_request) => {
self.on_dynamic_connect_request(connect_request).await
Ok(vec![self.on_dynamic_connect_request(connect_request).await])
}
IpPacketRequestData::Disconnect(disconnect_request) => {
self.on_disconnect_request(disconnect_request)
Ok(vec![self.on_disconnect_request(disconnect_request)])
}
IpPacketRequestData::Data(data_request) => self.on_data_request(data_request).await,
IpPacketRequestData::Ping(_) => {
log::info!("Received ping request: not implemented, dropping");
Ok(vec![])
}
IpPacketRequestData::Health(_) => {
log::info!("Received health request: not implemented, dropping");
Ok(vec![])
}
}
}
fn handle_disconnect_timer(&mut self) {
let stopped_clients = self.connected_clients.get_finished_client_handlers();
let inactive_clients = self.connected_clients.get_inactive_clients();
// TODO: Send disconnect responses to all disconnected clients
//for (ip, nym_address) in stopped_clients.iter().chain(disconnected_clients.iter()) {
// let response = IpPacketResponse::new_unrequested_disconnect(...)
// if let Err(err) = self.handle_response(response).await {
// log::error!("Failed to send disconnect response: {err}");
// }
//}
self.connected_clients
.disconnect_stopped_client_handlers(stopped_clients);
self.connected_clients
.disconnect_inactive_clients(inactive_clients);
}
// When an incoming mixnet message triggers a response that we send back, such as during
// connect handshake.
async fn handle_response(&self, response: IpPacketResponse) -> Result<()> {
let Some(recipient) = response.recipient() else {
log::error!("no recipient in response packet, this should NOT happen!");
log::error!("No recipient in response packet, this should NOT happen!");
return Err(IpPacketRouterError::NoRecipientInResponse);
};
@@ -452,6 +583,26 @@ impl MixnetListener {
.map_err(|err| IpPacketRouterError::FailedToSendPacketToMixnet { source: err })
}
// A single incoming request can trigger multiple responses, such as when data requests contain
// multiple IP packets.
async fn handle_responses(&self, responses: Vec<PacketHandleResult>) {
for response in responses {
match response {
Ok(Some(response)) => {
if let Err(err) = self.handle_response(response).await {
log::error!("Mixnet listener failed to handle response: {err}");
}
}
Ok(None) => {
continue;
}
Err(err) => {
log::error!("Error handling mixnet message: {err}");
}
}
}
}
pub(crate) async fn run(mut self) -> Result<()> {
let mut task_client = self.task_handle.fork("main_loop");
let mut disconnect_timer = tokio::time::interval(DISCONNECT_TIMER_INTERVAL);
@@ -462,21 +613,14 @@ impl MixnetListener {
log::debug!("IpPacketRouter [main loop]: received shutdown");
},
_ = disconnect_timer.tick() => {
self.connected_clients.disconnect_inactive_clients();
self.handle_disconnect_timer();
},
msg = self.mixnet_client.next() => {
if let Some(msg) = msg {
match self.on_reconstructed_message(msg).await {
Ok(Some(response)) => {
if let Err(err) = self.handle_response(response).await {
log::error!("Mixnet listener failed to handle response: {err}");
}
},
Ok(None) => {
continue;
},
Ok(responses) => self.handle_responses(responses).await,
Err(err) => {
log::error!("Error handling mixnet message: {err}");
log::error!("Error handling reconstructed mixnet message: {err}");
}
};
@@ -502,6 +646,5 @@ pub(crate) struct DisconnectEvent(pub(crate) IpAddr);
pub(crate) struct ConnectEvent {
pub(crate) ip: IpAddr,
pub(crate) nym_address: Recipient,
pub(crate) mix_hops: Option<u8>,
pub(crate) forward_from_tun_tx: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
}
@@ -1,22 +1,75 @@
use nym_ip_packet_requests::response::IpPacketResponse;
use nym_sdk::mixnet::MixnetMessageSender;
use std::{collections::HashMap, net::IpAddr};
use nym_task::TaskClient;
#[cfg(target_os = "linux")]
use tokio::io::AsyncReadExt;
use crate::{
error::{IpPacketRouterError, Result},
error::Result,
mixnet_listener::{self},
util::{create_message::create_input_message, parse_ip::parse_dst_addr},
util::parse_ip::parse_dst_addr,
};
// The TUN listener keeps a local map of the connected clients that has its state updated by the
// mixnet listener. Basically it's just so that we don't have to have mutexes around shared state.
// It's even ok if this is slightly out of date
pub(crate) struct ConnectedClientMirror {
pub(crate) forward_from_tun_tx: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
}
pub(crate) struct ConnectedClientsListener {
clients: HashMap<IpAddr, ConnectedClientMirror>,
connected_client_rx:
tokio::sync::mpsc::UnboundedReceiver<mixnet_listener::ConnectedClientEvent>,
}
impl ConnectedClientsListener {
pub(crate) fn new(
connected_client_rx: tokio::sync::mpsc::UnboundedReceiver<
mixnet_listener::ConnectedClientEvent,
>,
) -> Self {
ConnectedClientsListener {
clients: HashMap::new(),
connected_client_rx,
}
}
pub(crate) fn get(&self, ip: &IpAddr) -> Option<&ConnectedClientMirror> {
self.clients.get(ip)
}
pub(crate) fn update(&mut self, event: mixnet_listener::ConnectedClientEvent) {
match event {
mixnet_listener::ConnectedClientEvent::Connect(connected_event) => {
let mixnet_listener::ConnectEvent {
ip,
forward_from_tun_tx,
} = *connected_event;
log::trace!("Connect client: {ip}");
self.clients.insert(
ip,
ConnectedClientMirror {
forward_from_tun_tx,
},
);
}
mixnet_listener::ConnectedClientEvent::Disconnect(
mixnet_listener::DisconnectEvent(ip),
) => {
log::trace!("Disconnect client: {ip}");
self.clients.remove(&ip);
}
}
}
}
// Reads packet from TUN and writes to mixnet client
#[cfg(target_os = "linux")]
pub(crate) struct TunListener {
pub(crate) tun_reader: tokio::io::ReadHalf<tokio_tun::Tun>,
pub(crate) mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender,
pub(crate) task_client: TaskClient,
pub(crate) connected_clients: mixnet_listener::ConnectedClientsListener,
pub(crate) connected_clients: ConnectedClientsListener,
}
#[cfg(target_os = "linux")]
@@ -27,26 +80,22 @@ impl TunListener {
return Ok(());
};
if let Some(mixnet_listener::ConnectedClient {
nym_address,
mix_hops,
..
if let Some(ConnectedClientMirror {
forward_from_tun_tx,
}) = self.connected_clients.get(&dst_addr)
{
let packet = buf[..len].to_vec();
let response_packet = IpPacketResponse::new_ip_packet(packet.into())
.to_bytes()
.map_err(|err| IpPacketRouterError::FailedToSerializeResponsePacket {
source: err,
})?;
let input_message = create_input_message(*nym_address, response_packet, *mix_hops);
self.mixnet_client_sender
.send(input_message)
.await
.map_err(|err| IpPacketRouterError::FailedToSendPacketToMixnet { source: err })?;
if forward_from_tun_tx.send(packet).is_err() {
log::warn!("Failed to forward packet to connected client {dst_addr}: disconnecting it from tun listener");
self.connected_clients
.update(mixnet_listener::ConnectedClientEvent::Disconnect(
mixnet_listener::DisconnectEvent(dst_addr),
));
}
} else {
log::info!("No registered nym-address for packet - dropping");
log::info!(
"dropping packet from network: no registered client for destination: {dst_addr}"
);
}
Ok(())