Merge pull request #6710 from nymtech/bdq/versioning-fix
This commit is contained in:
Generated
+1
@@ -7028,6 +7028,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"futures",
|
||||
"nym-ip-packet-requests",
|
||||
"nym-lp",
|
||||
"nym-sdk",
|
||||
"thiserror 2.0.12",
|
||||
"tokio",
|
||||
|
||||
@@ -12,6 +12,26 @@ pub mod v7;
|
||||
pub mod v8;
|
||||
pub mod v9;
|
||||
|
||||
/// Highest IPR protocol version that is allowed to be sent as a **non-stream** mixnet payload
|
||||
/// (i.e. not wrapped in `LpFrameKind::SphinxStream`).
|
||||
pub const MAX_NON_STREAM_VERSION: u8 = v8::VERSION;
|
||||
|
||||
/// First IPR protocol version that **requires** the SphinxStream (LP) transport for non-stream
|
||||
/// mixnet sends, matching the node-side enforcement in `ip-packet-router`.
|
||||
pub const SPHINX_STREAM_VERSION_THRESHOLD: u8 = v9::VERSION;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn stream_transport_threshold_is_consistent() {
|
||||
assert_eq!(MAX_NON_STREAM_VERSION, 8);
|
||||
assert_eq!(SPHINX_STREAM_VERSION_THRESHOLD, 9);
|
||||
assert!(SPHINX_STREAM_VERSION_THRESHOLD > MAX_NON_STREAM_VERSION);
|
||||
}
|
||||
}
|
||||
|
||||
// version 3: initial version
|
||||
// version 4: IPv6 support
|
||||
// version 5: Add severity level to info response
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use nym_ip_packet_requests::v8::response::{
|
||||
use nym_ip_packet_client::current::response::{
|
||||
ControlResponse, DataResponse, InfoLevel, IpPacketResponse, IpPacketResponseData,
|
||||
};
|
||||
use nym_ip_packet_client::lp_stream;
|
||||
use nym_sdk::{
|
||||
DebugConfig, NymApiTopologyProvider, NymApiTopologyProviderConfig, NymNetworkDetails,
|
||||
TopologyProvider, mixnet::ReconstructedMessage,
|
||||
@@ -32,7 +33,10 @@ pub fn mixnet_debug_config(
|
||||
}
|
||||
|
||||
pub fn unpack_data_response(reconstructed_message: &ReconstructedMessage) -> Option<DataResponse> {
|
||||
match IpPacketResponse::from_reconstructed_message(reconstructed_message) {
|
||||
let payload =
|
||||
lp_stream::maybe_unwrap_lp_stream_payload_from_reconstructed(reconstructed_message);
|
||||
|
||||
match IpPacketResponse::from_bytes(payload) {
|
||||
Ok(response) => match response.data {
|
||||
IpPacketResponseData::Data(data_response) => Some(data_response),
|
||||
IpPacketResponseData::Control(control) => match *control {
|
||||
|
||||
@@ -9,7 +9,7 @@ use nym_connection_monitor::{
|
||||
wrap_icmp_in_ipv6,
|
||||
},
|
||||
};
|
||||
use nym_ip_packet_requests::{IpPair, codec::MultiIpPacketCodec, v8::request::IpPacketRequest};
|
||||
use nym_ip_packet_requests::{IpPair, codec::MultiIpPacketCodec};
|
||||
use nym_sdk::mixnet::{
|
||||
InputMessage, MixnetClient, MixnetMessageSender, Recipient, TransmissionLane,
|
||||
};
|
||||
@@ -25,6 +25,7 @@ pub async fn send_ping_v4(
|
||||
sequence_number: u16,
|
||||
destination: Ipv4Addr,
|
||||
exit_router_address: Recipient,
|
||||
stream_id: u64,
|
||||
) -> anyhow::Result<()> {
|
||||
let icmp_identifier = icmp_identifier();
|
||||
let icmp_echo_request = create_icmpv4_echo_request(sequence_number, icmp_identifier)?;
|
||||
@@ -35,7 +36,12 @@ pub async fn send_ping_v4(
|
||||
MultiIpPacketCodec::bundle_one_packet(ipv4_packet.packet().to_vec().into());
|
||||
|
||||
// Wrap into a mixnet input message addressed to the IPR
|
||||
let mixnet_message = create_input_message(exit_router_address, bundled_packet)?;
|
||||
let mixnet_message = create_input_message(
|
||||
exit_router_address,
|
||||
bundled_packet,
|
||||
stream_id,
|
||||
sequence_number,
|
||||
)?;
|
||||
|
||||
mixnet_client.send(mixnet_message).await?;
|
||||
Ok(())
|
||||
@@ -47,6 +53,7 @@ pub async fn send_ping_v6(
|
||||
sequence_number: u16,
|
||||
destination: Ipv6Addr,
|
||||
exit_router_address: Recipient,
|
||||
stream_id: u64,
|
||||
) -> anyhow::Result<()> {
|
||||
let icmp_identifier = icmp_identifier();
|
||||
let icmp_echo_request = create_icmpv6_echo_request(
|
||||
@@ -62,7 +69,12 @@ pub async fn send_ping_v6(
|
||||
MultiIpPacketCodec::bundle_one_packet(ipv6_packet.packet().to_vec().into());
|
||||
|
||||
// Wrap into a mixnet input message addressed to the IPR
|
||||
let mixnet_message = create_input_message(exit_router_address, bundled_packet)?;
|
||||
let mixnet_message = create_input_message(
|
||||
exit_router_address,
|
||||
bundled_packet,
|
||||
stream_id,
|
||||
sequence_number,
|
||||
)?;
|
||||
|
||||
// Send across the mixnet
|
||||
mixnet_client.send(mixnet_message).await?;
|
||||
@@ -72,15 +84,22 @@ pub async fn send_ping_v6(
|
||||
fn create_input_message(
|
||||
recipient: impl Into<Recipient>,
|
||||
bundled_packets: Bytes,
|
||||
stream_id: u64,
|
||||
sequence_number: u16,
|
||||
) -> anyhow::Result<InputMessage> {
|
||||
let packet = IpPacketRequest::new_data_request(bundled_packets).to_bytes()?;
|
||||
let packet = nym_ip_packet_client::current::new_data_request(bundled_packets).to_bytes()?;
|
||||
let framed_packet = nym_ip_packet_client::lp_stream::encode_stream_frame(
|
||||
stream_id,
|
||||
sequence_number as u32,
|
||||
packet,
|
||||
);
|
||||
|
||||
let lane = TransmissionLane::General;
|
||||
let packet_type = None;
|
||||
let surbs = 0;
|
||||
Ok(InputMessage::new_anonymous(
|
||||
recipient.into(),
|
||||
packet,
|
||||
framed_packet,
|
||||
surbs,
|
||||
lane,
|
||||
packet_type,
|
||||
|
||||
@@ -256,8 +256,8 @@ pub async fn do_ping(
|
||||
let (maybe_ip_pair, mut mixnet_client) =
|
||||
connect_exit(mixnet_client, exit_router_address).await;
|
||||
match maybe_ip_pair {
|
||||
Some(ip_pair) => (
|
||||
do_ping_exit(&mut mixnet_client, ip_pair, exit_router_address).await,
|
||||
Some((ip_pair, stream_id)) => (
|
||||
do_ping_exit(&mut mixnet_client, ip_pair, stream_id, exit_router_address).await,
|
||||
mixnet_client,
|
||||
),
|
||||
None => (Ok(Some(Exit::fail_to_connect())), mixnet_client),
|
||||
@@ -304,7 +304,7 @@ async fn do_ping_entry(
|
||||
async fn connect_exit(
|
||||
mixnet_client: MixnetClient,
|
||||
exit_router_address: Recipient,
|
||||
) -> (Option<IpPair>, MixnetClient) {
|
||||
) -> (Option<(IpPair, u64)>, MixnetClient) {
|
||||
// Step 2: connect to the exit gateway
|
||||
info!(
|
||||
"Connecting to exit gateway: {}",
|
||||
@@ -315,12 +315,19 @@ async fn connect_exit(
|
||||
let mut ipr_client = IprClientConnect::new(mixnet_client, cancel_token);
|
||||
|
||||
let maybe_ip_pair = ipr_client.connect(exit_router_address).await;
|
||||
let stream_id = ipr_client.stream_id();
|
||||
let mixnet_client = ipr_client.into_mixnet_client();
|
||||
|
||||
if let Ok(our_ips) = maybe_ip_pair {
|
||||
info!("Successfully connected to exit gateway");
|
||||
info!("Using mixnet VPN IP addresses: {our_ips}");
|
||||
(Some(our_ips), mixnet_client)
|
||||
let Some(stream_id) = stream_id else {
|
||||
tracing::warn!(
|
||||
"No active IPR stream id set after connect; cannot run IPR data-plane tests"
|
||||
);
|
||||
return (None, mixnet_client);
|
||||
};
|
||||
(Some((our_ips, stream_id)), mixnet_client)
|
||||
} else {
|
||||
(None, mixnet_client)
|
||||
}
|
||||
@@ -329,10 +336,11 @@ async fn connect_exit(
|
||||
pub async fn do_ping_exit(
|
||||
mixnet_client: &mut MixnetClient,
|
||||
our_ips: IpPair,
|
||||
stream_id: u64,
|
||||
exit_router_address: Recipient,
|
||||
) -> anyhow::Result<Option<Exit>> {
|
||||
// Step 3: perform ICMP connectivity checks for the exit gateway
|
||||
send_icmp_pings(mixnet_client, our_ips, exit_router_address).await?;
|
||||
send_icmp_pings(mixnet_client, our_ips, exit_router_address, stream_id).await?;
|
||||
listen_for_icmp_ping_replies(mixnet_client, our_ips).await
|
||||
}
|
||||
|
||||
@@ -340,6 +348,7 @@ async fn send_icmp_pings(
|
||||
mixnet_client: &MixnetClient,
|
||||
our_ips: IpPair,
|
||||
exit_router_address: Recipient,
|
||||
stream_id: u64,
|
||||
) -> anyhow::Result<()> {
|
||||
// ipv4 addresses for testing
|
||||
let ipr_tun_ip_v4 = NYM_TUN_DEVICE_ADDRESS_V4;
|
||||
@@ -361,6 +370,7 @@ async fn send_icmp_pings(
|
||||
ii,
|
||||
ipr_tun_ip_v4,
|
||||
exit_router_address,
|
||||
stream_id,
|
||||
)
|
||||
.await?;
|
||||
icmp::send_ping_v4(
|
||||
@@ -369,6 +379,7 @@ async fn send_icmp_pings(
|
||||
ii,
|
||||
external_ip_v4,
|
||||
exit_router_address,
|
||||
stream_id,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
@@ -381,6 +392,7 @@ async fn send_icmp_pings(
|
||||
ii,
|
||||
ipr_tun_ip_v6,
|
||||
exit_router_address,
|
||||
stream_id,
|
||||
)
|
||||
.await?;
|
||||
icmp::send_ping_v6(
|
||||
@@ -389,6 +401,7 @@ async fn send_icmp_pings(
|
||||
ii,
|
||||
external_ip_v6,
|
||||
exit_router_address,
|
||||
stream_id,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@@ -26,3 +26,4 @@ tracing.workspace = true
|
||||
|
||||
nym-sdk = { workspace = true }
|
||||
nym-ip-packet-requests = { workspace = true }
|
||||
nym-lp = { workspace = true }
|
||||
|
||||
@@ -14,7 +14,7 @@ use tracing::{debug, error};
|
||||
use nym_ip_packet_requests::response_helpers::{self, IprResponseError};
|
||||
|
||||
use crate::{
|
||||
current::{request::IpPacketRequest, response::IpPacketResponse},
|
||||
current::{self, response::IpPacketResponse},
|
||||
error::{Error, Result},
|
||||
helpers::check_ipr_message_version,
|
||||
};
|
||||
@@ -24,8 +24,12 @@ const IPR_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
enum ConnectionState {
|
||||
Disconnected,
|
||||
Connecting,
|
||||
Connected,
|
||||
Connecting {
|
||||
stream_id: u64,
|
||||
},
|
||||
Connected {
|
||||
stream_id: u64,
|
||||
},
|
||||
#[allow(unused)]
|
||||
Disconnecting,
|
||||
}
|
||||
@@ -51,17 +55,27 @@ impl IprClientConnect {
|
||||
self.mixnet_client
|
||||
}
|
||||
|
||||
pub fn stream_id(&self) -> Option<u64> {
|
||||
match self.connected {
|
||||
ConnectionState::Connecting { stream_id }
|
||||
| ConnectionState::Connected { stream_id } => Some(stream_id),
|
||||
ConnectionState::Disconnected | ConnectionState::Disconnecting => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn connect(&mut self, ip_packet_router_address: Recipient) -> Result<IpPair> {
|
||||
if self.connected != ConnectionState::Disconnected {
|
||||
return Err(Error::AlreadyConnected);
|
||||
}
|
||||
|
||||
tracing::info!("Connecting to exit gateway");
|
||||
self.connected = ConnectionState::Connecting;
|
||||
match self.connect_inner(ip_packet_router_address).await {
|
||||
Ok(ips) => {
|
||||
debug!("Successfully connected to the ip-packet-router");
|
||||
self.connected = ConnectionState::Connected;
|
||||
let Some(stream_id) = self.stream_id() else {
|
||||
return Err(Error::UnexpectedConnectResponse);
|
||||
};
|
||||
self.connected = ConnectionState::Connected { stream_id };
|
||||
Ok(ips)
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -74,21 +88,37 @@ impl IprClientConnect {
|
||||
|
||||
async fn connect_inner(&mut self, ip_packet_router_address: Recipient) -> Result<IpPair> {
|
||||
let request_id = self.send_connect_request(ip_packet_router_address).await?;
|
||||
self.connected = ConnectionState::Connecting {
|
||||
stream_id: request_id,
|
||||
};
|
||||
|
||||
debug!("Waiting for reply...");
|
||||
self.listen_for_connect_response(request_id).await
|
||||
}
|
||||
|
||||
async fn send_connect_request(&self, ip_packet_router_address: Recipient) -> Result<u64> {
|
||||
let (request, request_id) = IpPacketRequest::new_connect_request(None);
|
||||
async fn send_connect_request(&mut self, ip_packet_router_address: Recipient) -> Result<u64> {
|
||||
let (request, request_id) = current::new_connect_request(None);
|
||||
tracing::info!(
|
||||
request_id = request_id,
|
||||
protocol_version = request.protocol.version,
|
||||
current_version = crate::current::VERSION,
|
||||
"Sending IPR connect request"
|
||||
);
|
||||
if let Ok(bytes) = request.to_bytes() {
|
||||
let prefix = bytes.get(0..2).unwrap_or(&bytes);
|
||||
let prefix_hex = format!("{:02x?}", prefix);
|
||||
tracing::info!(request_id = request_id, prefix = %prefix_hex, "IPR connect bytes prefix");
|
||||
}
|
||||
|
||||
// We use 20 surbs for the connect request because typically the IPR is configured to have
|
||||
// a min threshold of 10 surbs that it reserves for itself to request additional surbs.
|
||||
let surbs = 20;
|
||||
let request_bytes = request.to_bytes()?;
|
||||
let framed_bytes = maybe_wrap_stream_frame(request_id, 0, request_bytes);
|
||||
self.mixnet_client
|
||||
.send(create_input_message(
|
||||
ip_packet_router_address,
|
||||
request,
|
||||
framed_bytes,
|
||||
surbs,
|
||||
)?)
|
||||
.await
|
||||
@@ -129,13 +159,19 @@ impl IprClientConnect {
|
||||
for msg in msgs {
|
||||
// Confirm that the version is correct
|
||||
if let Err(err) = check_ipr_message_version(&msg) {
|
||||
tracing::info!("Mixnet message version mismatch: {err}");
|
||||
let raw: &[u8] = msg.message.as_ref();
|
||||
tracing::warn!(
|
||||
first_byte = raw.first().copied(),
|
||||
expected = crate::current::VERSION,
|
||||
len = raw.len(),
|
||||
"Mixnet message version mismatch: {err}"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Then we deserialize the message
|
||||
tracing::debug!("IprClient: got message while waiting for connect response");
|
||||
let Ok(response) = IpPacketResponse::from_reconstructed_message(&msg) else {
|
||||
let Ok(response) = ipr_response_from_reconstructed_message(&msg) else {
|
||||
// This is ok, it's likely just one of our self-pings
|
||||
tracing::debug!("Failed to deserialize mixnet message");
|
||||
continue;
|
||||
@@ -160,14 +196,27 @@ impl IprClientConnect {
|
||||
}
|
||||
}
|
||||
|
||||
fn create_input_message(
|
||||
recipient: Recipient,
|
||||
request: IpPacketRequest,
|
||||
surbs: u32,
|
||||
) -> Result<InputMessage> {
|
||||
fn maybe_wrap_stream_frame(stream_id: u64, sequence_num: u32, payload: Vec<u8>) -> Vec<u8> {
|
||||
if !crate::lp_stream::current_requires_sphinx_stream_transport() {
|
||||
return payload;
|
||||
}
|
||||
|
||||
crate::lp_stream::encode_stream_frame(stream_id, sequence_num, payload)
|
||||
}
|
||||
|
||||
fn ipr_response_from_reconstructed_message(
|
||||
msg: &nym_sdk::mixnet::ReconstructedMessage,
|
||||
) -> std::result::Result<IpPacketResponse, bincode::Error> {
|
||||
let warn_on_unexpected = crate::lp_stream::current_requires_sphinx_stream_transport();
|
||||
let payload =
|
||||
crate::lp_stream::maybe_unwrap_lp_stream_payload(&msg.message, warn_on_unexpected);
|
||||
IpPacketResponse::from_bytes(payload)
|
||||
}
|
||||
|
||||
fn create_input_message(recipient: Recipient, bytes: Vec<u8>, surbs: u32) -> Result<InputMessage> {
|
||||
Ok(InputMessage::new_anonymous(
|
||||
recipient,
|
||||
request.to_bytes()?,
|
||||
bytes,
|
||||
surbs,
|
||||
TransmissionLane::General,
|
||||
None,
|
||||
|
||||
@@ -7,18 +7,16 @@ use nym_sdk::mixnet::ReconstructedMessage;
|
||||
use crate::{current::VERSION as CURRENT_VERSION, error::Result};
|
||||
|
||||
pub(crate) fn check_ipr_message_version(message: &ReconstructedMessage) -> Result<()> {
|
||||
nym_ip_packet_requests::response_helpers::check_ipr_message_version(
|
||||
&message.message,
|
||||
CURRENT_VERSION,
|
||||
)
|
||||
.map_err(|e| match e {
|
||||
IprResponseError::NoVersionByte => crate::Error::NoVersionInMessage,
|
||||
IprResponseError::VersionMismatch { expected, received } if received < expected => {
|
||||
crate::Error::ReceivedResponseWithOldVersion { expected, received }
|
||||
}
|
||||
IprResponseError::VersionMismatch { expected, received } => {
|
||||
crate::Error::ReceivedResponseWithNewVersion { expected, received }
|
||||
}
|
||||
_ => crate::Error::NoVersionInMessage,
|
||||
})
|
||||
let payload = crate::lp_stream::maybe_unwrap_lp_stream_payload_from_reconstructed(message);
|
||||
nym_ip_packet_requests::response_helpers::check_ipr_message_version(payload, CURRENT_VERSION)
|
||||
.map_err(|e| match e {
|
||||
IprResponseError::NoVersionByte => crate::Error::NoVersionInMessage,
|
||||
IprResponseError::VersionMismatch { expected, received } if received < expected => {
|
||||
crate::Error::ReceivedResponseWithOldVersion { expected, received }
|
||||
}
|
||||
IprResponseError::VersionMismatch { expected, received } => {
|
||||
crate::Error::ReceivedResponseWithNewVersion { expected, received }
|
||||
}
|
||||
_ => crate::Error::NoVersionInMessage,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ mod connect;
|
||||
mod error;
|
||||
mod helpers;
|
||||
mod listener;
|
||||
pub mod lp_stream;
|
||||
|
||||
pub use connect::IprClientConnect;
|
||||
pub use error::Error;
|
||||
|
||||
@@ -0,0 +1,100 @@
|
||||
use bytes::BytesMut;
|
||||
use nym_ip_packet_requests::SPHINX_STREAM_VERSION_THRESHOLD;
|
||||
use nym_lp::packet::frame::{
|
||||
LpFrame, LpFrameHeader, LpFrameKind, SphinxStreamFrameAttributes, SphinxStreamMsgType,
|
||||
};
|
||||
use nym_sdk::mixnet::ReconstructedMessage;
|
||||
use tracing::warn;
|
||||
|
||||
/// Whether the "current" IPR client is operating at a version where the node expects
|
||||
/// non-stream mixnet IPR messages to be LP Stream framed (see `SPHINX_STREAM_VERSION_THRESHOLD`).
|
||||
pub(crate) fn current_requires_sphinx_stream_transport() -> bool {
|
||||
crate::current::VERSION >= SPHINX_STREAM_VERSION_THRESHOLD
|
||||
}
|
||||
|
||||
pub fn maybe_unwrap_lp_stream_payload(data: &[u8], warn_on_unexpected: bool) -> &[u8] {
|
||||
if data.len() < LpFrameHeader::SIZE {
|
||||
if warn_on_unexpected {
|
||||
warn!(
|
||||
len = data.len(),
|
||||
header_size = LpFrameHeader::SIZE,
|
||||
"expected LP SphinxStream frame for IPR payload, but message is shorter than LP header; treating as raw payload"
|
||||
);
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
let Ok(header) = LpFrameHeader::parse(data) else {
|
||||
if warn_on_unexpected {
|
||||
warn!(
|
||||
"expected LP SphinxStream frame for IPR payload, but failed to parse LP header; treating as raw payload"
|
||||
);
|
||||
}
|
||||
return data;
|
||||
};
|
||||
|
||||
if header.kind == LpFrameKind::SphinxStream {
|
||||
&data[LpFrameHeader::SIZE..]
|
||||
} else {
|
||||
if warn_on_unexpected {
|
||||
warn!(
|
||||
kind = ?header.kind,
|
||||
"expected LP SphinxStream frame for IPR payload, but got different LP frame kind; treating as raw payload"
|
||||
);
|
||||
}
|
||||
data
|
||||
}
|
||||
}
|
||||
|
||||
pub fn maybe_unwrap_lp_stream_payload_from_reconstructed(message: &ReconstructedMessage) -> &[u8] {
|
||||
maybe_unwrap_lp_stream_payload(&message.message, false)
|
||||
}
|
||||
|
||||
pub fn encode_stream_frame(stream_id: u64, sequence_num: u32, payload: Vec<u8>) -> Vec<u8> {
|
||||
let attrs = SphinxStreamFrameAttributes {
|
||||
stream_id,
|
||||
msg_type: SphinxStreamMsgType::Data,
|
||||
sequence_num,
|
||||
};
|
||||
let frame = LpFrame::new_stream(attrs, payload);
|
||||
let mut buf = BytesMut::with_capacity(LpFrameHeader::SIZE + frame.content.len());
|
||||
frame.encode(&mut buf);
|
||||
buf.to_vec()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use nym_lp::packet::frame::SphinxStreamFrameAttributes;
|
||||
|
||||
#[test]
|
||||
fn stream_frame_roundtrip_unwraps_payload() {
|
||||
let stream_id = 0x0123_4567_89ab_cdef;
|
||||
let seq = 42u32;
|
||||
let payload = b"hello-ipr".to_vec();
|
||||
|
||||
let framed = encode_stream_frame(stream_id, seq, payload.clone());
|
||||
|
||||
let header = LpFrameHeader::parse(&framed).expect("valid lp header");
|
||||
assert_eq!(header.kind, LpFrameKind::SphinxStream);
|
||||
|
||||
let attrs =
|
||||
SphinxStreamFrameAttributes::parse(&header.frame_attributes).expect("valid attrs");
|
||||
assert_eq!(attrs.stream_id, stream_id);
|
||||
assert_eq!(attrs.sequence_num, seq);
|
||||
assert_eq!(attrs.msg_type, SphinxStreamMsgType::Data);
|
||||
|
||||
let unwrapped = maybe_unwrap_lp_stream_payload(&framed, false);
|
||||
assert_eq!(unwrapped, payload.as_slice());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unwrap_noops_on_non_stream_or_malformed_data() {
|
||||
let raw = b"\x09\x00\x01\x02\x03";
|
||||
assert_eq!(maybe_unwrap_lp_stream_payload(raw, false), raw);
|
||||
|
||||
// malformed header: not enough bytes for LP header
|
||||
let short = b"\x00\x01";
|
||||
assert_eq!(maybe_unwrap_lp_stream_payload(short, false), short);
|
||||
}
|
||||
}
|
||||
@@ -9,11 +9,9 @@ use crate::ip_packet_client::{
|
||||
use crate::mixnet::{MixnetClient, MixnetStream, Recipient};
|
||||
use crate::Error;
|
||||
use bytes::Bytes;
|
||||
use current_ipr::response::IpPacketResponse;
|
||||
use nym_ip_packet_requests::response_helpers;
|
||||
use nym_ip_packet_requests::{
|
||||
v9::{self, response::IpPacketResponse},
|
||||
IpPair,
|
||||
};
|
||||
use nym_ip_packet_requests::{v9 as current_ipr, IpPair};
|
||||
use nym_network_defaults::NymNetworkDetails;
|
||||
use std::time::Duration;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
@@ -107,7 +105,7 @@ impl IpMixStream {
|
||||
}
|
||||
|
||||
async fn connect_tunnel(stream: &mut MixnetStream) -> Result<IpPair, Error> {
|
||||
let (request, request_id) = v9::new_connect_request(None);
|
||||
let (request, request_id) = current_ipr::new_connect_request(None);
|
||||
debug!("Sending connect request with ID: {}", request_id);
|
||||
|
||||
let request_bytes = request.to_bytes()?;
|
||||
@@ -146,7 +144,7 @@ impl IpMixStream {
|
||||
/// Send an IP packet through the tunnel.
|
||||
pub async fn send_ip_packet(&mut self, packet: &[u8]) -> Result<(), Error> {
|
||||
self.check_connected()?;
|
||||
let request = v9::new_data_request(packet.to_vec().into());
|
||||
let request = current_ipr::new_data_request(packet.to_vec().into());
|
||||
let request_bytes = request.to_bytes()?;
|
||||
self.stream
|
||||
.write_all(&request_bytes)
|
||||
|
||||
@@ -218,7 +218,10 @@ fn create_ip_packet_response(
|
||||
ClientVersion::V6 => IpPacketResponseV6::new_ip_packet(packets).to_bytes(),
|
||||
ClientVersion::V7 => IpPacketResponseV7::new_ip_packet(packets).to_bytes(),
|
||||
ClientVersion::V8 => IpPacketResponseV8::new_ip_packet(packets).to_bytes(),
|
||||
ClientVersion::V9 => v9::new_ip_packet_response(packets).to_bytes(),
|
||||
ClientVersion::V9 => {
|
||||
let resp = v9::new_ip_packet_response(packets);
|
||||
resp.to_bytes()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ use nym_ip_packet_requests::{
|
||||
IpPair, v6::request::IpPacketRequest as IpPacketRequestV6,
|
||||
v7::request::IpPacketRequest as IpPacketRequestV7,
|
||||
v8::request::IpPacketRequest as IpPacketRequestV8,
|
||||
v9::request::IpPacketRequest as IpPacketRequestV9,
|
||||
};
|
||||
use nym_sdk::mixnet::ReconstructedMessage;
|
||||
use nym_service_provider_requests_common::{Protocol, ServiceProviderType};
|
||||
@@ -131,14 +132,14 @@ impl TryFrom<&ReconstructedMessage> for IpPacketRequest {
|
||||
Ok(IpPacketRequest::from((request_v8, sender_tag)))
|
||||
}
|
||||
9 => {
|
||||
let request_v8 = IpPacketRequestV8::from_reconstructed_message(reconstructed)
|
||||
let request_v9 = IpPacketRequestV9::from_reconstructed_message(reconstructed)
|
||||
.map_err(
|
||||
|source| IpPacketRouterError::FailedToDeserializeTaggedPacket { source },
|
||||
)?;
|
||||
let sender_tag = reconstructed
|
||||
.sender_tag
|
||||
.ok_or(IpPacketRouterError::MissingSenderTag)?;
|
||||
Ok(v9::convert(request_v8, sender_tag))
|
||||
Ok(v9::convert(request_v9, sender_tag))
|
||||
}
|
||||
_ => {
|
||||
log::info!("Received packet with invalid version: v{request_version}");
|
||||
|
||||
@@ -130,7 +130,12 @@ impl VersionedResponse {
|
||||
ClientVersion::V6 => IpPacketResponseV6::try_from(self)?.to_bytes(),
|
||||
ClientVersion::V7 => IpPacketResponseV7::try_from(self)?.to_bytes(),
|
||||
ClientVersion::V8 => IpPacketResponseV8::try_from(self)?.to_bytes(),
|
||||
ClientVersion::V9 => IpPacketResponseV8::try_from(self)?.to_bytes(),
|
||||
ClientVersion::V9 => {
|
||||
let mut resp = IpPacketResponseV8::try_from(self)?;
|
||||
resp.version = nym_ip_packet_requests::v9::VERSION;
|
||||
let bytes = resp.to_bytes();
|
||||
bytes
|
||||
}
|
||||
}
|
||||
.map_err(|err| IpPacketRouterError::FailedToSerializeResponsePacket { source: err })
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ use crate::{
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use nym_ip_packet_requests::codec::MultiIpPacketCodec;
|
||||
use nym_ip_packet_requests::{MAX_NON_STREAM_VERSION, SPHINX_STREAM_VERSION_THRESHOLD};
|
||||
use nym_lp::packet::frame::{LpFrameHeader, LpFrameKind, SphinxStreamFrameAttributes};
|
||||
use nym_sdk::mixnet::MixnetMessageSender;
|
||||
use nym_sphinx::receiver::ReconstructedMessage;
|
||||
@@ -559,8 +560,9 @@ impl MixnetListener {
|
||||
///
|
||||
/// # Version / transport enforcement
|
||||
///
|
||||
/// - LP Stream frames (`stream_id` is `Some`) **must** carry v9+ payloads.
|
||||
/// - Non-stream messages (`stream_id` is `None`) **must** be v8 or lower.
|
||||
/// - LP Stream frames (`stream_id` is `Some`) **must** carry payloads with version
|
||||
/// `>= SPHINX_STREAM_VERSION_THRESHOLD` (see `nym_ip_packet_requests`).
|
||||
/// - Non-stream messages (`stream_id` is `None`) **must** be `<= MAX_NON_STREAM_VERSION`.
|
||||
///
|
||||
/// Messages that violate these rules are dropped.
|
||||
async fn on_ipr_message(
|
||||
@@ -578,16 +580,22 @@ impl MixnetListener {
|
||||
}?;
|
||||
|
||||
// Enforce version/transport consistency:
|
||||
// - LP Stream frames must carry v9+ payloads
|
||||
// - Non-stream messages must be v8 or lower
|
||||
// - LP Stream frames must carry payloads at/above the SphinxStream threshold
|
||||
// - Non-stream messages must be at/below the max non-stream version
|
||||
let version_num = request.version().into_u8();
|
||||
|
||||
if stream_id.is_some() && version_num < 9 {
|
||||
log::warn!("LP Stream frame contains v{version_num} payload, expected v9+; dropping",);
|
||||
if stream_id.is_some() && version_num < SPHINX_STREAM_VERSION_THRESHOLD {
|
||||
log::warn!(
|
||||
"LP Stream frame contains v{version_num} payload, expected v{expected}+; dropping",
|
||||
expected = SPHINX_STREAM_VERSION_THRESHOLD
|
||||
);
|
||||
return Ok(vec![]);
|
||||
}
|
||||
if stream_id.is_none() && version_num >= 9 {
|
||||
log::warn!("Non-stream message claims v{version_num}, expected v8 or lower; dropping",);
|
||||
if stream_id.is_none() && version_num > MAX_NON_STREAM_VERSION {
|
||||
log::warn!(
|
||||
"Non-stream message claims v{version_num}, expected v{expected} or lower; dropping",
|
||||
expected = MAX_NON_STREAM_VERSION
|
||||
);
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user