ipr version revert on develop (#6772)
This commit is contained in:
@@ -14,7 +14,7 @@ use tracing::{debug, error};
|
||||
use nym_ip_packet_requests::response_helpers::{self, IprResponseError};
|
||||
|
||||
use crate::{
|
||||
current::{self, response::IpPacketResponse},
|
||||
current::{request::IpPacketRequest, response::IpPacketResponse},
|
||||
error::{Error, Result},
|
||||
helpers::check_ipr_message_version,
|
||||
};
|
||||
@@ -24,12 +24,8 @@ const IPR_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
enum ConnectionState {
|
||||
Disconnected,
|
||||
Connecting {
|
||||
stream_id: u64,
|
||||
},
|
||||
Connected {
|
||||
stream_id: u64,
|
||||
},
|
||||
Connecting,
|
||||
Connected,
|
||||
#[allow(unused)]
|
||||
Disconnecting,
|
||||
}
|
||||
@@ -55,27 +51,17 @@ impl IprClientConnect {
|
||||
self.mixnet_client
|
||||
}
|
||||
|
||||
pub fn stream_id(&self) -> Option<u64> {
|
||||
match self.connected {
|
||||
ConnectionState::Connecting { stream_id }
|
||||
| ConnectionState::Connected { stream_id } => Some(stream_id),
|
||||
ConnectionState::Disconnected | ConnectionState::Disconnecting => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn connect(&mut self, ip_packet_router_address: Recipient) -> Result<IpPair> {
|
||||
if self.connected != ConnectionState::Disconnected {
|
||||
return Err(Error::AlreadyConnected);
|
||||
}
|
||||
|
||||
tracing::info!("Connecting to exit gateway");
|
||||
self.connected = ConnectionState::Connecting;
|
||||
match self.connect_inner(ip_packet_router_address).await {
|
||||
Ok(ips) => {
|
||||
debug!("Successfully connected to the ip-packet-router");
|
||||
let Some(stream_id) = self.stream_id() else {
|
||||
return Err(Error::UnexpectedConnectResponse);
|
||||
};
|
||||
self.connected = ConnectionState::Connected { stream_id };
|
||||
self.connected = ConnectionState::Connected;
|
||||
Ok(ips)
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -88,37 +74,21 @@ impl IprClientConnect {
|
||||
|
||||
async fn connect_inner(&mut self, ip_packet_router_address: Recipient) -> Result<IpPair> {
|
||||
let request_id = self.send_connect_request(ip_packet_router_address).await?;
|
||||
self.connected = ConnectionState::Connecting {
|
||||
stream_id: request_id,
|
||||
};
|
||||
|
||||
debug!("Waiting for reply...");
|
||||
self.listen_for_connect_response(request_id).await
|
||||
}
|
||||
|
||||
async fn send_connect_request(&mut self, ip_packet_router_address: Recipient) -> Result<u64> {
|
||||
let (request, request_id) = current::new_connect_request(None);
|
||||
tracing::info!(
|
||||
request_id = request_id,
|
||||
protocol_version = request.protocol.version,
|
||||
current_version = crate::current::VERSION,
|
||||
"Sending IPR connect request"
|
||||
);
|
||||
if let Ok(bytes) = request.to_bytes() {
|
||||
let prefix = bytes.get(0..2).unwrap_or(&bytes);
|
||||
let prefix_hex = format!("{:02x?}", prefix);
|
||||
tracing::info!(request_id = request_id, prefix = %prefix_hex, "IPR connect bytes prefix");
|
||||
}
|
||||
async fn send_connect_request(&self, ip_packet_router_address: Recipient) -> Result<u64> {
|
||||
let (request, request_id) = IpPacketRequest::new_connect_request(None);
|
||||
|
||||
// We use 20 surbs for the connect request because typically the IPR is configured to have
|
||||
// a min threshold of 10 surbs that it reserves for itself to request additional surbs.
|
||||
let surbs = 20;
|
||||
let request_bytes = request.to_bytes()?;
|
||||
let framed_bytes = maybe_wrap_stream_frame(request_id, 0, request_bytes);
|
||||
self.mixnet_client
|
||||
.send(create_input_message(
|
||||
ip_packet_router_address,
|
||||
framed_bytes,
|
||||
request,
|
||||
surbs,
|
||||
)?)
|
||||
.await
|
||||
@@ -159,19 +129,13 @@ impl IprClientConnect {
|
||||
for msg in msgs {
|
||||
// Confirm that the version is correct
|
||||
if let Err(err) = check_ipr_message_version(&msg) {
|
||||
let raw: &[u8] = msg.message.as_ref();
|
||||
tracing::warn!(
|
||||
first_byte = raw.first().copied(),
|
||||
expected = crate::current::VERSION,
|
||||
len = raw.len(),
|
||||
"Mixnet message version mismatch: {err}"
|
||||
);
|
||||
tracing::info!("Mixnet message version mismatch: {err}");
|
||||
continue;
|
||||
}
|
||||
|
||||
// Then we deserialize the message
|
||||
tracing::debug!("IprClient: got message while waiting for connect response");
|
||||
let Ok(response) = ipr_response_from_reconstructed_message(&msg) else {
|
||||
let Ok(response) = IpPacketResponse::from_reconstructed_message(&msg) else {
|
||||
// This is ok, it's likely just one of our self-pings
|
||||
tracing::debug!("Failed to deserialize mixnet message");
|
||||
continue;
|
||||
@@ -196,27 +160,14 @@ impl IprClientConnect {
|
||||
}
|
||||
}
|
||||
|
||||
fn maybe_wrap_stream_frame(stream_id: u64, sequence_num: u32, payload: Vec<u8>) -> Vec<u8> {
|
||||
if !crate::lp_stream::current_requires_sphinx_stream_transport() {
|
||||
return payload;
|
||||
}
|
||||
|
||||
crate::lp_stream::encode_stream_frame(stream_id, sequence_num, payload)
|
||||
}
|
||||
|
||||
fn ipr_response_from_reconstructed_message(
|
||||
msg: &nym_sdk::mixnet::ReconstructedMessage,
|
||||
) -> std::result::Result<IpPacketResponse, bincode::Error> {
|
||||
let warn_on_unexpected = crate::lp_stream::current_requires_sphinx_stream_transport();
|
||||
let payload =
|
||||
crate::lp_stream::maybe_unwrap_lp_stream_payload(&msg.message, warn_on_unexpected);
|
||||
IpPacketResponse::from_bytes(payload)
|
||||
}
|
||||
|
||||
fn create_input_message(recipient: Recipient, bytes: Vec<u8>, surbs: u32) -> Result<InputMessage> {
|
||||
fn create_input_message(
|
||||
recipient: Recipient,
|
||||
request: IpPacketRequest,
|
||||
surbs: u32,
|
||||
) -> Result<InputMessage> {
|
||||
Ok(InputMessage::new_anonymous(
|
||||
recipient,
|
||||
bytes,
|
||||
request.to_bytes()?,
|
||||
surbs,
|
||||
TransmissionLane::General,
|
||||
None,
|
||||
|
||||
@@ -7,16 +7,18 @@ use nym_sdk::mixnet::ReconstructedMessage;
|
||||
use crate::{current::VERSION as CURRENT_VERSION, error::Result};
|
||||
|
||||
pub(crate) fn check_ipr_message_version(message: &ReconstructedMessage) -> Result<()> {
|
||||
let payload = crate::lp_stream::maybe_unwrap_lp_stream_payload_from_reconstructed(message);
|
||||
nym_ip_packet_requests::response_helpers::check_ipr_message_version(payload, CURRENT_VERSION)
|
||||
.map_err(|e| match e {
|
||||
IprResponseError::NoVersionByte => crate::Error::NoVersionInMessage,
|
||||
IprResponseError::VersionMismatch { expected, received } if received < expected => {
|
||||
crate::Error::ReceivedResponseWithOldVersion { expected, received }
|
||||
}
|
||||
IprResponseError::VersionMismatch { expected, received } => {
|
||||
crate::Error::ReceivedResponseWithNewVersion { expected, received }
|
||||
}
|
||||
_ => crate::Error::NoVersionInMessage,
|
||||
})
|
||||
nym_ip_packet_requests::response_helpers::check_ipr_message_version(
|
||||
&message.message,
|
||||
CURRENT_VERSION,
|
||||
)
|
||||
.map_err(|e| match e {
|
||||
IprResponseError::NoVersionByte => crate::Error::NoVersionInMessage,
|
||||
IprResponseError::VersionMismatch { expected, received } if received < expected => {
|
||||
crate::Error::ReceivedResponseWithOldVersion { expected, received }
|
||||
}
|
||||
IprResponseError::VersionMismatch { expected, received } => {
|
||||
crate::Error::ReceivedResponseWithNewVersion { expected, received }
|
||||
}
|
||||
_ => crate::Error::NoVersionInMessage,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -5,11 +5,10 @@ mod connect;
|
||||
mod error;
|
||||
mod helpers;
|
||||
mod listener;
|
||||
pub mod lp_stream;
|
||||
|
||||
pub use connect::IprClientConnect;
|
||||
pub use error::Error;
|
||||
pub use listener::{IprListener, MixnetMessageOutcome};
|
||||
|
||||
// Re-export the currently used version
|
||||
pub use nym_ip_packet_requests::v9 as current;
|
||||
pub use nym_ip_packet_requests::v8 as current;
|
||||
|
||||
@@ -1,100 +0,0 @@
|
||||
use bytes::BytesMut;
|
||||
use nym_ip_packet_requests::SPHINX_STREAM_VERSION_THRESHOLD;
|
||||
use nym_lp::packet::frame::{
|
||||
LpFrame, LpFrameHeader, LpFrameKind, SphinxStreamFrameAttributes, SphinxStreamMsgType,
|
||||
};
|
||||
use nym_sdk::mixnet::ReconstructedMessage;
|
||||
use tracing::warn;
|
||||
|
||||
/// Whether the "current" IPR client is operating at a version where the node expects
|
||||
/// non-stream mixnet IPR messages to be LP Stream framed (see `SPHINX_STREAM_VERSION_THRESHOLD`).
|
||||
pub(crate) fn current_requires_sphinx_stream_transport() -> bool {
|
||||
crate::current::VERSION >= SPHINX_STREAM_VERSION_THRESHOLD
|
||||
}
|
||||
|
||||
pub fn maybe_unwrap_lp_stream_payload(data: &[u8], warn_on_unexpected: bool) -> &[u8] {
|
||||
if data.len() < LpFrameHeader::SIZE {
|
||||
if warn_on_unexpected {
|
||||
warn!(
|
||||
len = data.len(),
|
||||
header_size = LpFrameHeader::SIZE,
|
||||
"expected LP SphinxStream frame for IPR payload, but message is shorter than LP header; treating as raw payload"
|
||||
);
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
let Ok(header) = LpFrameHeader::parse(data) else {
|
||||
if warn_on_unexpected {
|
||||
warn!(
|
||||
"expected LP SphinxStream frame for IPR payload, but failed to parse LP header; treating as raw payload"
|
||||
);
|
||||
}
|
||||
return data;
|
||||
};
|
||||
|
||||
if header.kind == LpFrameKind::SphinxStream {
|
||||
&data[LpFrameHeader::SIZE..]
|
||||
} else {
|
||||
if warn_on_unexpected {
|
||||
warn!(
|
||||
kind = ?header.kind,
|
||||
"expected LP SphinxStream frame for IPR payload, but got different LP frame kind; treating as raw payload"
|
||||
);
|
||||
}
|
||||
data
|
||||
}
|
||||
}
|
||||
|
||||
pub fn maybe_unwrap_lp_stream_payload_from_reconstructed(message: &ReconstructedMessage) -> &[u8] {
|
||||
maybe_unwrap_lp_stream_payload(&message.message, false)
|
||||
}
|
||||
|
||||
pub fn encode_stream_frame(stream_id: u64, sequence_num: u32, payload: Vec<u8>) -> Vec<u8> {
|
||||
let attrs = SphinxStreamFrameAttributes {
|
||||
stream_id,
|
||||
msg_type: SphinxStreamMsgType::Data,
|
||||
sequence_num,
|
||||
};
|
||||
let frame = LpFrame::new_stream(attrs, payload);
|
||||
let mut buf = BytesMut::with_capacity(LpFrameHeader::SIZE + frame.content.len());
|
||||
frame.encode(&mut buf);
|
||||
buf.to_vec()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use nym_lp::packet::frame::SphinxStreamFrameAttributes;
|
||||
|
||||
#[test]
|
||||
fn stream_frame_roundtrip_unwraps_payload() {
|
||||
let stream_id = 0x0123_4567_89ab_cdef;
|
||||
let seq = 42u32;
|
||||
let payload = b"hello-ipr".to_vec();
|
||||
|
||||
let framed = encode_stream_frame(stream_id, seq, payload.clone());
|
||||
|
||||
let header = LpFrameHeader::parse(&framed).expect("valid lp header");
|
||||
assert_eq!(header.kind, LpFrameKind::SphinxStream);
|
||||
|
||||
let attrs =
|
||||
SphinxStreamFrameAttributes::parse(&header.frame_attributes).expect("valid attrs");
|
||||
assert_eq!(attrs.stream_id, stream_id);
|
||||
assert_eq!(attrs.sequence_num, seq);
|
||||
assert_eq!(attrs.msg_type, SphinxStreamMsgType::Data);
|
||||
|
||||
let unwrapped = maybe_unwrap_lp_stream_payload(&framed, false);
|
||||
assert_eq!(unwrapped, payload.as_slice());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unwrap_noops_on_non_stream_or_malformed_data() {
|
||||
let raw = b"\x09\x00\x01\x02\x03";
|
||||
assert_eq!(maybe_unwrap_lp_stream_payload(raw, false), raw);
|
||||
|
||||
// malformed header: not enough bytes for LP header
|
||||
let short = b"\x00\x01";
|
||||
assert_eq!(maybe_unwrap_lp_stream_payload(short, false), short);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user