Compare commits

...

5 Commits

Author SHA1 Message Date
Jon Häggblad 7727c6747c Enforce verifying signed requests 2024-09-25 09:44:58 +02:00
Jon Häggblad 64ba86ed08 wip: start handling disconect 2024-09-24 09:18:44 +02:00
Jon Häggblad d4f1b59d2b Remove 2-hop support in IPR 2024-09-24 08:44:33 +02:00
Jon Häggblad 35ea6fd179 Send unrequested disconnect messages 2024-09-24 05:01:48 +02:00
Jon Häggblad 2cba42411f Respond to ping and health requests 2024-09-24 04:27:20 +02:00
4 changed files with 210 additions and 85 deletions
@@ -67,3 +67,19 @@ impl From<v7::response::InfoLevel> for v6::response::InfoLevel {
}
}
}
impl From<v7::response::UnrequestedDisconnectReason> for v6::response::UnrequestedDisconnectReason {
fn from(reason: v7::response::UnrequestedDisconnectReason) -> Self {
match reason {
v7::response::UnrequestedDisconnectReason::ClientMixnetTrafficTimeout => {
v6::response::UnrequestedDisconnectReason::ClientMixnetTrafficTimeout
}
v7::response::UnrequestedDisconnectReason::ClientTunTrafficTimeout => {
v6::response::UnrequestedDisconnectReason::ClientTunTrafficTimeout
}
v7::response::UnrequestedDisconnectReason::Other(reason) => {
v6::response::UnrequestedDisconnectReason::Other(reason)
}
}
}
}
@@ -22,9 +22,6 @@ pub(crate) struct ConnectedClientHandler {
// The address of the client that this handler is connected to
nym_address: Recipient,
// The number of hops the packet should take before reaching the client
mix_hops: Option<u8>,
// Channel to receive packets from the tun_listener
forward_from_tun_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
@@ -47,7 +44,6 @@ pub(crate) struct ConnectedClientHandler {
impl ConnectedClientHandler {
pub(crate) fn start(
reply_to: Recipient,
reply_to_hops: Option<u8>,
buffer_timeout: std::time::Duration,
client_version: SupportedClientVersion,
mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender,
@@ -67,7 +63,6 @@ impl ConnectedClientHandler {
let connected_client_handler = ConnectedClientHandler {
nym_address: reply_to,
mix_hops: reply_to_hops,
forward_from_tun_rx,
mixnet_client_sender,
close_rx,
@@ -98,7 +93,7 @@ impl ConnectedClientHandler {
}
.map_err(|err| IpPacketRouterError::FailedToSerializeResponsePacket { source: err })?;
let input_message = create_input_message(self.nym_address, response_packet, self.mix_hops);
let input_message = create_input_message(self.nym_address, response_packet);
self.mixnet_client_sender
.send(input_message)
@@ -4,6 +4,7 @@ use std::{collections::HashMap, net::SocketAddr};
use bytes::{Bytes, BytesMut};
use futures::StreamExt;
use nym_ip_packet_requests::v7::request::{HealthRequest, PingRequest};
use nym_ip_packet_requests::v7::response::{
DynamicConnectFailureReason, InfoLevel, InfoResponseReply, StaticConnectFailureReason,
};
@@ -95,6 +96,33 @@ impl ConnectedClients {
})
}
fn remove_client_with_nym_address(&self, nym_address: &Recipient) {
// Remove the client from both the ipv4 and ipv6 maps
let ipv4 = self.clients_ipv4_mapping.iter().find_map(|(ip, client)| {
if client.nym_address == *nym_address {
Some(ip)
} else {
None
}
});
let client_ipv4 = self.clients_ipv4_mapping.remove(ipv4);
let ipv6 = self.clients_ipv6_mapping.iter().find_map(|(ip, client)| {
if client.nym_address == *nym_address {
Some(ip)
} else {
None
}
});
let client_ipv6 = self.clients_ipv6_mapping.remove(ipv6);
// These two should be the same
if let Some(client) = client_ipv4 {
// client.update_activity()
}
}
#[allow(dead_code)]
fn lookup_client_from_nym_address(&self, nym_address: &Recipient) -> Option<&ConnectedClient> {
self.clients_ipv4_mapping
.iter()
@@ -111,7 +139,7 @@ impl ConnectedClients {
&mut self,
ips: IpPair,
nym_address: Recipient,
mix_hops: Option<u8>,
client_version: SupportedClientVersion,
forward_from_tun_tx: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
close_tx: tokio::sync::oneshot::Sender<()>,
handle: tokio::task::JoinHandle<()>,
@@ -121,8 +149,8 @@ impl ConnectedClients {
let client = ConnectedClient {
nym_address,
ipv6: ips.ipv6,
mix_hops,
last_activity: Arc::new(RwLock::new(std::time::Instant::now())),
client_version,
_close_tx: Arc::new(CloseTx {
nym_address,
inner: Some(close_tx),
@@ -155,7 +183,7 @@ impl ConnectedClients {
}
// Identify connected client handlers that have stopped without being told to stop
fn get_finished_client_handlers(&mut self) -> Vec<(IpPair, Recipient)> {
fn get_finished_client_handlers(&mut self) -> Vec<(IpPair, Recipient, SupportedClientVersion)> {
self.clients_ipv4_mapping
.iter_mut()
.filter_map(|(ip, connected_client)| {
@@ -163,6 +191,7 @@ impl ConnectedClients {
Some((
IpPair::new(*ip, connected_client.ipv6),
connected_client.nym_address,
connected_client.client_version,
))
} else {
None
@@ -171,7 +200,7 @@ impl ConnectedClients {
.collect()
}
async fn get_inactive_clients(&mut self) -> Vec<(IpPair, Recipient)> {
async fn get_inactive_clients(&mut self) -> Vec<(IpPair, Recipient, SupportedClientVersion)> {
let now = std::time::Instant::now();
let mut ret = vec![];
for (ip, connected_client) in self.clients_ipv4_mapping.iter() {
@@ -181,37 +210,41 @@ impl ConnectedClients {
ret.push((
IpPair::new(*ip, connected_client.ipv6),
connected_client.nym_address,
connected_client.client_version,
))
}
}
ret
}
fn disconnect_stopped_client_handlers(&mut self, stopped_clients: Vec<(IpPair, Recipient)>) {
for (ips, _) in &stopped_clients {
fn disconnect(&mut self, ips: &IpPair) {
self.clients_ipv4_mapping.remove(&ips.ipv4);
self.clients_ipv6_mapping.remove(&ips.ipv6);
self.tun_listener_connected_client_tx
.send(ConnectedClientEvent::Disconnect(DisconnectEvent(*ips)))
.inspect_err(|err| {
log::error!("Failed to send disconnect event: {err}");
})
.ok();
}
fn disconnect_stopped_client_handlers(
&mut self,
stopped_clients: Vec<(IpPair, Recipient, SupportedClientVersion)>,
) {
for (ips, _, _) in &stopped_clients {
log::info!("Disconnect stopped client: {ips}");
self.clients_ipv4_mapping.remove(&ips.ipv4);
self.clients_ipv6_mapping.remove(&ips.ipv6);
self.tun_listener_connected_client_tx
.send(ConnectedClientEvent::Disconnect(DisconnectEvent(*ips)))
.tap_err(|err| {
log::error!("Failed to send disconnect event: {err}");
})
.ok();
self.disconnect(ips);
}
}
fn disconnect_inactive_clients(&mut self, inactive_clients: Vec<(IpPair, Recipient)>) {
for (ips, _) in &inactive_clients {
fn disconnect_inactive_clients(
&mut self,
inactive_clients: Vec<(IpPair, Recipient, SupportedClientVersion)>,
) {
for (ips, _, _) in &inactive_clients {
log::info!("Disconnect inactive client: {ips}");
self.clients_ipv4_mapping.remove(&ips.ipv4);
self.clients_ipv6_mapping.remove(&ips.ipv6);
self.tun_listener_connected_client_tx
.send(ConnectedClientEvent::Disconnect(DisconnectEvent(*ips)))
.tap_err(|err| {
log::error!("Failed to send disconnect event: {err}");
})
.ok();
self.disconnect(ips);
}
}
@@ -236,12 +269,12 @@ pub(crate) struct ConnectedClient {
// The assigned IPv6 address of this client
pub(crate) ipv6: Ipv6Addr,
// Number of mix node hops that the client has requested to use
pub(crate) mix_hops: Option<u8>,
// Keep track of last activity so we can disconnect inactive clients
pub(crate) last_activity: Arc<RwLock<std::time::Instant>>,
// The version of the client, since we need to know this to send the correct response
pub(crate) client_version: SupportedClientVersion,
pub(crate) _close_tx: Arc<CloseTx>,
// Handle for the connected client handler
@@ -378,6 +411,56 @@ impl Response {
}
}
fn new_pong(
request_id: u64,
reply_to: Recipient,
client_version: SupportedClientVersion,
) -> Self {
match client_version {
SupportedClientVersion::V6 => Response::V6(v6::response::IpPacketResponse::new_pong(
request_id, reply_to,
)),
SupportedClientVersion::V7 => Response::V7(v7::response::IpPacketResponse::new_pong(
request_id, reply_to,
)),
}
}
fn new_health_response(
request_id: u64,
reply_to: Recipient,
build_info: nym_bin_common::build_information::BinaryBuildInformationOwned,
client_version: SupportedClientVersion,
) -> Self {
match client_version {
SupportedClientVersion::V6 => {
Response::V6(v6::response::IpPacketResponse::new_health_response(
request_id, reply_to, build_info, None,
))
}
SupportedClientVersion::V7 => {
Response::V7(v7::response::IpPacketResponse::new_health_response(
request_id, reply_to, build_info, None,
))
}
}
}
fn new_unrequested_disconnect(
reply_to: Recipient,
reason: v7::response::UnrequestedDisconnectReason,
client_version: SupportedClientVersion,
) -> Self {
match client_version {
SupportedClientVersion::V6 => Response::V6(
v6::response::IpPacketResponse::new_unrequested_disconnect(reply_to, reason.into()),
),
SupportedClientVersion::V7 => Response::V7(
v7::response::IpPacketResponse::new_unrequested_disconnect(reply_to, reason),
),
}
}
fn to_bytes(&self) -> Result<Vec<u8>> {
match self {
Response::V6(response) => response.to_bytes(),
@@ -429,10 +512,8 @@ impl MixnetListener {
let request_id = connect_request.request_id;
let requested_ips = connect_request.ips;
let reply_to = connect_request.reply_to;
let reply_to_hops = connect_request.reply_to_hops;
// TODO: add to connect request
let buffer_timeout = nym_ip_packet_requests::codec::BUFFER_TIMEOUT;
// TODO: ignoring reply_to_avg_mix_delays for now
// Check that the IP is available in the set of connected clients
let is_ip_taken = self.connected_clients.is_ip_connected(&requested_ips);
@@ -464,7 +545,6 @@ impl MixnetListener {
let (forward_from_tun_tx, close_tx, handle) =
connected_client_handler::ConnectedClientHandler::start(
reply_to,
reply_to_hops,
buffer_timeout,
client_version,
self.mixnet_client.split_sender(),
@@ -474,7 +554,7 @@ impl MixnetListener {
self.connected_clients.connect(
requested_ips,
reply_to,
reply_to_hops,
client_version,
forward_from_tun_tx,
close_tx,
handle,
@@ -518,10 +598,8 @@ impl MixnetListener {
let request_id = connect_request.request_id;
let reply_to = connect_request.reply_to;
let reply_to_hops = connect_request.reply_to_hops;
// TODO: add to connect request
let buffer_timeout = nym_ip_packet_requests::codec::BUFFER_TIMEOUT;
// TODO: ignoring reply_to_avg_mix_delays for now
// Check if it's the same client connecting again, then we just reuse the same IP
// TODO: this is problematic. Until we sign connect requests this means you can spam people
@@ -559,7 +637,6 @@ impl MixnetListener {
let (forward_from_tun_tx, close_tx, handle) =
connected_client_handler::ConnectedClientHandler::start(
reply_to,
reply_to_hops,
buffer_timeout,
client_version,
self.mixnet_client.split_sender(),
@@ -569,7 +646,7 @@ impl MixnetListener {
self.connected_clients.connect(
new_ips,
reply_to,
reply_to_hops,
client_version,
forward_from_tun_tx,
close_tx,
handle,
@@ -584,11 +661,16 @@ impl MixnetListener {
fn on_disconnect_request(
&self,
_disconnect_request: DisconnectRequest,
_client_version: SupportedClientVersion,
disconnect_request: DisconnectRequest,
client_version: SupportedClientVersion,
) -> PacketHandleResult {
log::info!("Received disconnect request: not implemented, dropping");
Ok(None)
log::info!("Received disconnect request");
let request_id = disconnect_request.request_id;
let reply_to = disconnect_request.reply_to;
let ips = self.connected_clients.lookup_ip_from_nym_address(&reply_to);
self.connected_clients.disconnect(&ips);
}
async fn handle_packet(
@@ -664,6 +746,48 @@ impl MixnetListener {
Ok(responses)
}
fn on_ping_request(
&self,
ping_request: PingRequest,
client_version: SupportedClientVersion,
) -> PacketHandleResult {
log::info!(
"Received ping request from {sender_address}",
sender_address = ping_request.reply_to
);
let reply_to = ping_request.reply_to;
let request_id = ping_request.request_id;
Ok(Some(Response::new_pong(
request_id,
reply_to,
client_version,
)))
}
fn on_health_request(
&self,
health_request: HealthRequest,
client_version: SupportedClientVersion,
) -> PacketHandleResult {
log::info!(
"Received health request from {sender_address}",
sender_address = health_request.reply_to
);
let reply_to = health_request.reply_to;
let request_id = health_request.request_id;
let build_info = nym_bin_common::bin_info_owned!();
Ok(Some(Response::new_health_response(
request_id,
reply_to,
build_info,
client_version,
)))
}
fn on_version_mismatch(
&self,
_version: u8,
@@ -718,13 +842,11 @@ impl MixnetListener {
IpPacketRequestData::Data(data_request) => {
self.on_data_request(data_request, client_version).await
}
IpPacketRequestData::Ping(_) => {
log::info!("Received ping request: not implemented, dropping");
Ok(vec![])
IpPacketRequestData::Ping(ping_request) => {
Ok(vec![self.on_ping_request(ping_request, client_version)])
}
IpPacketRequestData::Health(_) => {
log::info!("Received health request: not implemented, dropping");
Ok(vec![])
IpPacketRequestData::Health(health_request) => {
Ok(vec![self.on_health_request(health_request, client_version)])
}
}
}
@@ -733,13 +855,27 @@ impl MixnetListener {
let stopped_clients = self.connected_clients.get_finished_client_handlers();
let inactive_clients = self.connected_clients.get_inactive_clients().await;
// TODO: Send disconnect responses to all disconnected clients
//for (ip, nym_address) in stopped_clients.iter().chain(disconnected_clients.iter()) {
// let response = IpPacketResponse::new_unrequested_disconnect(...)
// if let Err(err) = self.handle_response(response).await {
// log::error!("Failed to send disconnect response: {err}");
// }
//}
// WIP(JON): confirm we should send disconnect on handle stopped
for (_ip, nym_address, client_version) in &stopped_clients {
let response = Response::new_unrequested_disconnect(
*nym_address,
v7::response::UnrequestedDisconnectReason::Other("handler stopped".to_string()),
*client_version,
);
if let Err(err) = self.handle_response(response).await {
log::error!("Failed to send disconnect response: {err}");
}
}
for (_ip, nym_address, client_version) in &inactive_clients {
let response = Response::new_unrequested_disconnect(
*nym_address,
v7::response::UnrequestedDisconnectReason::ClientMixnetTrafficTimeout,
*client_version,
);
if let Err(err) = self.handle_response(response).await {
log::error!("Failed to send disconnect response: {err}");
}
}
self.connected_clients
.disconnect_stopped_client_handlers(stopped_clients);
@@ -757,17 +893,7 @@ impl MixnetListener {
let response_packet = response.to_bytes()?;
// We could avoid this lookup if we check this when we create the response.
let mix_hops = if let Some(c) = self
.connected_clients
.lookup_client_from_nym_address(recipient)
{
c.mix_hops
} else {
None
};
let input_message = create_input_message(*recipient, response_packet, mix_hops);
let input_message = create_input_message(*recipient, response_packet);
self.mixnet_client
.send(input_message)
.await
@@ -881,14 +1007,9 @@ fn verify_signed_request(
request: &impl SignedRequest,
client_version: SupportedClientVersion,
) -> Result<()> {
if let Err(err) = request.verify() {
// If the client is V6, we don't care about missing signature
if client_version == SupportedClientVersion::V6 {
return Ok(());
}
return Err(IpPacketRouterError::FailedToVerifyRequest { source: err });
}
Ok(())
request
.verify()
.map_err(|source| IpPacketRouterError::FailedToVerifyRequest { source })
}
pub(crate) enum ConnectedClientEvent {
@@ -4,15 +4,8 @@ use nym_task::connections::TransmissionLane;
pub(crate) fn create_input_message(
nym_address: Recipient,
response_packet: Vec<u8>,
mix_hops: Option<u8>,
) -> InputMessage {
let lane = TransmissionLane::General;
let packet_type = None;
InputMessage::new_regular_with_custom_hops(
nym_address,
response_packet,
lane,
packet_type,
mix_hops,
)
InputMessage::new_regular(nym_address, response_packet, lane, packet_type)
}