|
|
|
@@ -4,6 +4,7 @@ use std::{collections::HashMap, net::SocketAddr};
|
|
|
|
|
|
|
|
|
|
use bytes::{Bytes, BytesMut};
|
|
|
|
|
use futures::StreamExt;
|
|
|
|
|
use nym_ip_packet_requests::v7::request::{HealthRequest, PingRequest};
|
|
|
|
|
use nym_ip_packet_requests::v7::response::{
|
|
|
|
|
DynamicConnectFailureReason, InfoLevel, InfoResponseReply, StaticConnectFailureReason,
|
|
|
|
|
};
|
|
|
|
@@ -95,6 +96,33 @@ impl ConnectedClients {
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn remove_client_with_nym_address(&self, nym_address: &Recipient) {
|
|
|
|
|
// Remove the client from both the ipv4 and ipv6 maps
|
|
|
|
|
let ipv4 = self.clients_ipv4_mapping.iter().find_map(|(ip, client)| {
|
|
|
|
|
if client.nym_address == *nym_address {
|
|
|
|
|
Some(ip)
|
|
|
|
|
} else {
|
|
|
|
|
None
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
let client_ipv4 = self.clients_ipv4_mapping.remove(ipv4);
|
|
|
|
|
|
|
|
|
|
let ipv6 = self.clients_ipv6_mapping.iter().find_map(|(ip, client)| {
|
|
|
|
|
if client.nym_address == *nym_address {
|
|
|
|
|
Some(ip)
|
|
|
|
|
} else {
|
|
|
|
|
None
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
let client_ipv6 = self.clients_ipv6_mapping.remove(ipv6);
|
|
|
|
|
|
|
|
|
|
// These two should be the same
|
|
|
|
|
if let Some(client) = client_ipv4 {
|
|
|
|
|
// client.update_activity()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[allow(dead_code)]
|
|
|
|
|
fn lookup_client_from_nym_address(&self, nym_address: &Recipient) -> Option<&ConnectedClient> {
|
|
|
|
|
self.clients_ipv4_mapping
|
|
|
|
|
.iter()
|
|
|
|
@@ -111,7 +139,7 @@ impl ConnectedClients {
|
|
|
|
|
&mut self,
|
|
|
|
|
ips: IpPair,
|
|
|
|
|
nym_address: Recipient,
|
|
|
|
|
mix_hops: Option<u8>,
|
|
|
|
|
client_version: SupportedClientVersion,
|
|
|
|
|
forward_from_tun_tx: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
|
|
|
|
|
close_tx: tokio::sync::oneshot::Sender<()>,
|
|
|
|
|
handle: tokio::task::JoinHandle<()>,
|
|
|
|
@@ -121,8 +149,8 @@ impl ConnectedClients {
|
|
|
|
|
let client = ConnectedClient {
|
|
|
|
|
nym_address,
|
|
|
|
|
ipv6: ips.ipv6,
|
|
|
|
|
mix_hops,
|
|
|
|
|
last_activity: Arc::new(RwLock::new(std::time::Instant::now())),
|
|
|
|
|
client_version,
|
|
|
|
|
_close_tx: Arc::new(CloseTx {
|
|
|
|
|
nym_address,
|
|
|
|
|
inner: Some(close_tx),
|
|
|
|
@@ -155,7 +183,7 @@ impl ConnectedClients {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Identify connected client handlers that have stopped without being told to stop
|
|
|
|
|
fn get_finished_client_handlers(&mut self) -> Vec<(IpPair, Recipient)> {
|
|
|
|
|
fn get_finished_client_handlers(&mut self) -> Vec<(IpPair, Recipient, SupportedClientVersion)> {
|
|
|
|
|
self.clients_ipv4_mapping
|
|
|
|
|
.iter_mut()
|
|
|
|
|
.filter_map(|(ip, connected_client)| {
|
|
|
|
@@ -163,6 +191,7 @@ impl ConnectedClients {
|
|
|
|
|
Some((
|
|
|
|
|
IpPair::new(*ip, connected_client.ipv6),
|
|
|
|
|
connected_client.nym_address,
|
|
|
|
|
connected_client.client_version,
|
|
|
|
|
))
|
|
|
|
|
} else {
|
|
|
|
|
None
|
|
|
|
@@ -171,7 +200,7 @@ impl ConnectedClients {
|
|
|
|
|
.collect()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn get_inactive_clients(&mut self) -> Vec<(IpPair, Recipient)> {
|
|
|
|
|
async fn get_inactive_clients(&mut self) -> Vec<(IpPair, Recipient, SupportedClientVersion)> {
|
|
|
|
|
let now = std::time::Instant::now();
|
|
|
|
|
let mut ret = vec![];
|
|
|
|
|
for (ip, connected_client) in self.clients_ipv4_mapping.iter() {
|
|
|
|
@@ -181,37 +210,41 @@ impl ConnectedClients {
|
|
|
|
|
ret.push((
|
|
|
|
|
IpPair::new(*ip, connected_client.ipv6),
|
|
|
|
|
connected_client.nym_address,
|
|
|
|
|
connected_client.client_version,
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
ret
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn disconnect_stopped_client_handlers(&mut self, stopped_clients: Vec<(IpPair, Recipient)>) {
|
|
|
|
|
for (ips, _) in &stopped_clients {
|
|
|
|
|
fn disconnect(&mut self, ips: &IpPair) {
|
|
|
|
|
self.clients_ipv4_mapping.remove(&ips.ipv4);
|
|
|
|
|
self.clients_ipv6_mapping.remove(&ips.ipv6);
|
|
|
|
|
self.tun_listener_connected_client_tx
|
|
|
|
|
.send(ConnectedClientEvent::Disconnect(DisconnectEvent(*ips)))
|
|
|
|
|
.inspect_err(|err| {
|
|
|
|
|
log::error!("Failed to send disconnect event: {err}");
|
|
|
|
|
})
|
|
|
|
|
.ok();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn disconnect_stopped_client_handlers(
|
|
|
|
|
&mut self,
|
|
|
|
|
stopped_clients: Vec<(IpPair, Recipient, SupportedClientVersion)>,
|
|
|
|
|
) {
|
|
|
|
|
for (ips, _, _) in &stopped_clients {
|
|
|
|
|
log::info!("Disconnect stopped client: {ips}");
|
|
|
|
|
self.clients_ipv4_mapping.remove(&ips.ipv4);
|
|
|
|
|
self.clients_ipv6_mapping.remove(&ips.ipv6);
|
|
|
|
|
self.tun_listener_connected_client_tx
|
|
|
|
|
.send(ConnectedClientEvent::Disconnect(DisconnectEvent(*ips)))
|
|
|
|
|
.tap_err(|err| {
|
|
|
|
|
log::error!("Failed to send disconnect event: {err}");
|
|
|
|
|
})
|
|
|
|
|
.ok();
|
|
|
|
|
self.disconnect(ips);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn disconnect_inactive_clients(&mut self, inactive_clients: Vec<(IpPair, Recipient)>) {
|
|
|
|
|
for (ips, _) in &inactive_clients {
|
|
|
|
|
fn disconnect_inactive_clients(
|
|
|
|
|
&mut self,
|
|
|
|
|
inactive_clients: Vec<(IpPair, Recipient, SupportedClientVersion)>,
|
|
|
|
|
) {
|
|
|
|
|
for (ips, _, _) in &inactive_clients {
|
|
|
|
|
log::info!("Disconnect inactive client: {ips}");
|
|
|
|
|
self.clients_ipv4_mapping.remove(&ips.ipv4);
|
|
|
|
|
self.clients_ipv6_mapping.remove(&ips.ipv6);
|
|
|
|
|
self.tun_listener_connected_client_tx
|
|
|
|
|
.send(ConnectedClientEvent::Disconnect(DisconnectEvent(*ips)))
|
|
|
|
|
.tap_err(|err| {
|
|
|
|
|
log::error!("Failed to send disconnect event: {err}");
|
|
|
|
|
})
|
|
|
|
|
.ok();
|
|
|
|
|
self.disconnect(ips);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -236,12 +269,12 @@ pub(crate) struct ConnectedClient {
|
|
|
|
|
// The assigned IPv6 address of this client
|
|
|
|
|
pub(crate) ipv6: Ipv6Addr,
|
|
|
|
|
|
|
|
|
|
// Number of mix node hops that the client has requested to use
|
|
|
|
|
pub(crate) mix_hops: Option<u8>,
|
|
|
|
|
|
|
|
|
|
// Keep track of last activity so we can disconnect inactive clients
|
|
|
|
|
pub(crate) last_activity: Arc<RwLock<std::time::Instant>>,
|
|
|
|
|
|
|
|
|
|
// The version of the client, since we need to know this to send the correct response
|
|
|
|
|
pub(crate) client_version: SupportedClientVersion,
|
|
|
|
|
|
|
|
|
|
pub(crate) _close_tx: Arc<CloseTx>,
|
|
|
|
|
|
|
|
|
|
// Handle for the connected client handler
|
|
|
|
@@ -378,6 +411,56 @@ impl Response {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn new_pong(
|
|
|
|
|
request_id: u64,
|
|
|
|
|
reply_to: Recipient,
|
|
|
|
|
client_version: SupportedClientVersion,
|
|
|
|
|
) -> Self {
|
|
|
|
|
match client_version {
|
|
|
|
|
SupportedClientVersion::V6 => Response::V6(v6::response::IpPacketResponse::new_pong(
|
|
|
|
|
request_id, reply_to,
|
|
|
|
|
)),
|
|
|
|
|
SupportedClientVersion::V7 => Response::V7(v7::response::IpPacketResponse::new_pong(
|
|
|
|
|
request_id, reply_to,
|
|
|
|
|
)),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn new_health_response(
|
|
|
|
|
request_id: u64,
|
|
|
|
|
reply_to: Recipient,
|
|
|
|
|
build_info: nym_bin_common::build_information::BinaryBuildInformationOwned,
|
|
|
|
|
client_version: SupportedClientVersion,
|
|
|
|
|
) -> Self {
|
|
|
|
|
match client_version {
|
|
|
|
|
SupportedClientVersion::V6 => {
|
|
|
|
|
Response::V6(v6::response::IpPacketResponse::new_health_response(
|
|
|
|
|
request_id, reply_to, build_info, None,
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
SupportedClientVersion::V7 => {
|
|
|
|
|
Response::V7(v7::response::IpPacketResponse::new_health_response(
|
|
|
|
|
request_id, reply_to, build_info, None,
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn new_unrequested_disconnect(
|
|
|
|
|
reply_to: Recipient,
|
|
|
|
|
reason: v7::response::UnrequestedDisconnectReason,
|
|
|
|
|
client_version: SupportedClientVersion,
|
|
|
|
|
) -> Self {
|
|
|
|
|
match client_version {
|
|
|
|
|
SupportedClientVersion::V6 => Response::V6(
|
|
|
|
|
v6::response::IpPacketResponse::new_unrequested_disconnect(reply_to, reason.into()),
|
|
|
|
|
),
|
|
|
|
|
SupportedClientVersion::V7 => Response::V7(
|
|
|
|
|
v7::response::IpPacketResponse::new_unrequested_disconnect(reply_to, reason),
|
|
|
|
|
),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn to_bytes(&self) -> Result<Vec<u8>> {
|
|
|
|
|
match self {
|
|
|
|
|
Response::V6(response) => response.to_bytes(),
|
|
|
|
@@ -429,10 +512,8 @@ impl MixnetListener {
|
|
|
|
|
let request_id = connect_request.request_id;
|
|
|
|
|
let requested_ips = connect_request.ips;
|
|
|
|
|
let reply_to = connect_request.reply_to;
|
|
|
|
|
let reply_to_hops = connect_request.reply_to_hops;
|
|
|
|
|
// TODO: add to connect request
|
|
|
|
|
let buffer_timeout = nym_ip_packet_requests::codec::BUFFER_TIMEOUT;
|
|
|
|
|
// TODO: ignoring reply_to_avg_mix_delays for now
|
|
|
|
|
|
|
|
|
|
// Check that the IP is available in the set of connected clients
|
|
|
|
|
let is_ip_taken = self.connected_clients.is_ip_connected(&requested_ips);
|
|
|
|
@@ -464,7 +545,6 @@ impl MixnetListener {
|
|
|
|
|
let (forward_from_tun_tx, close_tx, handle) =
|
|
|
|
|
connected_client_handler::ConnectedClientHandler::start(
|
|
|
|
|
reply_to,
|
|
|
|
|
reply_to_hops,
|
|
|
|
|
buffer_timeout,
|
|
|
|
|
client_version,
|
|
|
|
|
self.mixnet_client.split_sender(),
|
|
|
|
@@ -474,7 +554,7 @@ impl MixnetListener {
|
|
|
|
|
self.connected_clients.connect(
|
|
|
|
|
requested_ips,
|
|
|
|
|
reply_to,
|
|
|
|
|
reply_to_hops,
|
|
|
|
|
client_version,
|
|
|
|
|
forward_from_tun_tx,
|
|
|
|
|
close_tx,
|
|
|
|
|
handle,
|
|
|
|
@@ -518,10 +598,8 @@ impl MixnetListener {
|
|
|
|
|
|
|
|
|
|
let request_id = connect_request.request_id;
|
|
|
|
|
let reply_to = connect_request.reply_to;
|
|
|
|
|
let reply_to_hops = connect_request.reply_to_hops;
|
|
|
|
|
// TODO: add to connect request
|
|
|
|
|
let buffer_timeout = nym_ip_packet_requests::codec::BUFFER_TIMEOUT;
|
|
|
|
|
// TODO: ignoring reply_to_avg_mix_delays for now
|
|
|
|
|
|
|
|
|
|
// Check if it's the same client connecting again, then we just reuse the same IP
|
|
|
|
|
// TODO: this is problematic. Until we sign connect requests this means you can spam people
|
|
|
|
@@ -559,7 +637,6 @@ impl MixnetListener {
|
|
|
|
|
let (forward_from_tun_tx, close_tx, handle) =
|
|
|
|
|
connected_client_handler::ConnectedClientHandler::start(
|
|
|
|
|
reply_to,
|
|
|
|
|
reply_to_hops,
|
|
|
|
|
buffer_timeout,
|
|
|
|
|
client_version,
|
|
|
|
|
self.mixnet_client.split_sender(),
|
|
|
|
@@ -569,7 +646,7 @@ impl MixnetListener {
|
|
|
|
|
self.connected_clients.connect(
|
|
|
|
|
new_ips,
|
|
|
|
|
reply_to,
|
|
|
|
|
reply_to_hops,
|
|
|
|
|
client_version,
|
|
|
|
|
forward_from_tun_tx,
|
|
|
|
|
close_tx,
|
|
|
|
|
handle,
|
|
|
|
@@ -584,11 +661,16 @@ impl MixnetListener {
|
|
|
|
|
|
|
|
|
|
fn on_disconnect_request(
|
|
|
|
|
&self,
|
|
|
|
|
_disconnect_request: DisconnectRequest,
|
|
|
|
|
_client_version: SupportedClientVersion,
|
|
|
|
|
disconnect_request: DisconnectRequest,
|
|
|
|
|
client_version: SupportedClientVersion,
|
|
|
|
|
) -> PacketHandleResult {
|
|
|
|
|
log::info!("Received disconnect request: not implemented, dropping");
|
|
|
|
|
Ok(None)
|
|
|
|
|
log::info!("Received disconnect request");
|
|
|
|
|
|
|
|
|
|
let request_id = disconnect_request.request_id;
|
|
|
|
|
let reply_to = disconnect_request.reply_to;
|
|
|
|
|
|
|
|
|
|
let ips = self.connected_clients.lookup_ip_from_nym_address(&reply_to);
|
|
|
|
|
self.connected_clients.disconnect(&ips);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn handle_packet(
|
|
|
|
@@ -664,6 +746,48 @@ impl MixnetListener {
|
|
|
|
|
Ok(responses)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn on_ping_request(
|
|
|
|
|
&self,
|
|
|
|
|
ping_request: PingRequest,
|
|
|
|
|
client_version: SupportedClientVersion,
|
|
|
|
|
) -> PacketHandleResult {
|
|
|
|
|
log::info!(
|
|
|
|
|
"Received ping request from {sender_address}",
|
|
|
|
|
sender_address = ping_request.reply_to
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
let reply_to = ping_request.reply_to;
|
|
|
|
|
let request_id = ping_request.request_id;
|
|
|
|
|
|
|
|
|
|
Ok(Some(Response::new_pong(
|
|
|
|
|
request_id,
|
|
|
|
|
reply_to,
|
|
|
|
|
client_version,
|
|
|
|
|
)))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn on_health_request(
|
|
|
|
|
&self,
|
|
|
|
|
health_request: HealthRequest,
|
|
|
|
|
client_version: SupportedClientVersion,
|
|
|
|
|
) -> PacketHandleResult {
|
|
|
|
|
log::info!(
|
|
|
|
|
"Received health request from {sender_address}",
|
|
|
|
|
sender_address = health_request.reply_to
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
let reply_to = health_request.reply_to;
|
|
|
|
|
let request_id = health_request.request_id;
|
|
|
|
|
let build_info = nym_bin_common::bin_info_owned!();
|
|
|
|
|
|
|
|
|
|
Ok(Some(Response::new_health_response(
|
|
|
|
|
request_id,
|
|
|
|
|
reply_to,
|
|
|
|
|
build_info,
|
|
|
|
|
client_version,
|
|
|
|
|
)))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn on_version_mismatch(
|
|
|
|
|
&self,
|
|
|
|
|
_version: u8,
|
|
|
|
@@ -718,13 +842,11 @@ impl MixnetListener {
|
|
|
|
|
IpPacketRequestData::Data(data_request) => {
|
|
|
|
|
self.on_data_request(data_request, client_version).await
|
|
|
|
|
}
|
|
|
|
|
IpPacketRequestData::Ping(_) => {
|
|
|
|
|
log::info!("Received ping request: not implemented, dropping");
|
|
|
|
|
Ok(vec![])
|
|
|
|
|
IpPacketRequestData::Ping(ping_request) => {
|
|
|
|
|
Ok(vec![self.on_ping_request(ping_request, client_version)])
|
|
|
|
|
}
|
|
|
|
|
IpPacketRequestData::Health(_) => {
|
|
|
|
|
log::info!("Received health request: not implemented, dropping");
|
|
|
|
|
Ok(vec![])
|
|
|
|
|
IpPacketRequestData::Health(health_request) => {
|
|
|
|
|
Ok(vec![self.on_health_request(health_request, client_version)])
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@@ -733,13 +855,27 @@ impl MixnetListener {
|
|
|
|
|
let stopped_clients = self.connected_clients.get_finished_client_handlers();
|
|
|
|
|
let inactive_clients = self.connected_clients.get_inactive_clients().await;
|
|
|
|
|
|
|
|
|
|
// TODO: Send disconnect responses to all disconnected clients
|
|
|
|
|
//for (ip, nym_address) in stopped_clients.iter().chain(disconnected_clients.iter()) {
|
|
|
|
|
// let response = IpPacketResponse::new_unrequested_disconnect(...)
|
|
|
|
|
// if let Err(err) = self.handle_response(response).await {
|
|
|
|
|
// log::error!("Failed to send disconnect response: {err}");
|
|
|
|
|
// }
|
|
|
|
|
//}
|
|
|
|
|
// WIP(JON): confirm we should send disconnect on handle stopped
|
|
|
|
|
for (_ip, nym_address, client_version) in &stopped_clients {
|
|
|
|
|
let response = Response::new_unrequested_disconnect(
|
|
|
|
|
*nym_address,
|
|
|
|
|
v7::response::UnrequestedDisconnectReason::Other("handler stopped".to_string()),
|
|
|
|
|
*client_version,
|
|
|
|
|
);
|
|
|
|
|
if let Err(err) = self.handle_response(response).await {
|
|
|
|
|
log::error!("Failed to send disconnect response: {err}");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
for (_ip, nym_address, client_version) in &inactive_clients {
|
|
|
|
|
let response = Response::new_unrequested_disconnect(
|
|
|
|
|
*nym_address,
|
|
|
|
|
v7::response::UnrequestedDisconnectReason::ClientMixnetTrafficTimeout,
|
|
|
|
|
*client_version,
|
|
|
|
|
);
|
|
|
|
|
if let Err(err) = self.handle_response(response).await {
|
|
|
|
|
log::error!("Failed to send disconnect response: {err}");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
self.connected_clients
|
|
|
|
|
.disconnect_stopped_client_handlers(stopped_clients);
|
|
|
|
@@ -757,17 +893,7 @@ impl MixnetListener {
|
|
|
|
|
|
|
|
|
|
let response_packet = response.to_bytes()?;
|
|
|
|
|
|
|
|
|
|
// We could avoid this lookup if we check this when we create the response.
|
|
|
|
|
let mix_hops = if let Some(c) = self
|
|
|
|
|
.connected_clients
|
|
|
|
|
.lookup_client_from_nym_address(recipient)
|
|
|
|
|
{
|
|
|
|
|
c.mix_hops
|
|
|
|
|
} else {
|
|
|
|
|
None
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let input_message = create_input_message(*recipient, response_packet, mix_hops);
|
|
|
|
|
let input_message = create_input_message(*recipient, response_packet);
|
|
|
|
|
self.mixnet_client
|
|
|
|
|
.send(input_message)
|
|
|
|
|
.await
|
|
|
|
@@ -881,14 +1007,9 @@ fn verify_signed_request(
|
|
|
|
|
request: &impl SignedRequest,
|
|
|
|
|
client_version: SupportedClientVersion,
|
|
|
|
|
) -> Result<()> {
|
|
|
|
|
if let Err(err) = request.verify() {
|
|
|
|
|
// If the client is V6, we don't care about missing signature
|
|
|
|
|
if client_version == SupportedClientVersion::V6 {
|
|
|
|
|
return Ok(());
|
|
|
|
|
}
|
|
|
|
|
return Err(IpPacketRouterError::FailedToVerifyRequest { source: err });
|
|
|
|
|
}
|
|
|
|
|
Ok(())
|
|
|
|
|
request
|
|
|
|
|
.verify()
|
|
|
|
|
.map_err(|source| IpPacketRouterError::FailedToVerifyRequest { source })
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub(crate) enum ConnectedClientEvent {
|
|
|
|
|