Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d1ba5acd72 | |||
| b7b238584d | |||
| 7ed5b5477b | |||
| 09506683ce | |||
| 98f1480ae8 | |||
| 05b6f5e282 |
@@ -15,6 +15,9 @@ env:
|
||||
jobs:
|
||||
publish-dry-run:
|
||||
runs-on: arc-linux-latest
|
||||
timeout-minutes: 35
|
||||
env:
|
||||
RUSTUP_PERMIT_COPY_RENAME: 1
|
||||
steps:
|
||||
- name: Checkout repo
|
||||
uses: actions/checkout@v6
|
||||
@@ -59,20 +62,60 @@ jobs:
|
||||
- name: Bump versions (local only)
|
||||
run: |
|
||||
cargo workspaces version custom ${{ inputs.version }} \
|
||||
--allow-branch ${{ github.ref_name }} \
|
||||
--no-git-commit \
|
||||
--yes
|
||||
|
||||
- name: Preflight publish checks
|
||||
run: |
|
||||
python3 tools/internal/check_publish_preflight.py
|
||||
|
||||
# Dry run may show cascading dependency errors because packages aren't
|
||||
# actually uploaded - these are expected and ignored. We check for real
|
||||
# errors like packaging failures, missing metadata, or invalid Cargo.toml.
|
||||
- name: Publish (dry run)
|
||||
run: |
|
||||
output=$(cargo workspaces publish --dry-run --allow-dirty 2>&1) || true
|
||||
echo "$output"
|
||||
set +e
|
||||
publish_status=1
|
||||
max_attempts=2
|
||||
attempt=1
|
||||
rm -f /tmp/publish-dry-run.log
|
||||
|
||||
# Check for real errors (not cascading dependency errors)
|
||||
# Cascading errors mention "crates.io index", real errors mention "Cargo.toml"
|
||||
echo "$output" | grep -i "Cargo.toml" && exit 1 || true
|
||||
while [ "$attempt" -le "$max_attempts" ]; do
|
||||
echo "Dry-run publish attempt ${attempt}/${max_attempts}"
|
||||
cargo workspaces publish --dry-run --allow-dirty 2>&1 | tee /tmp/publish-dry-run.log
|
||||
publish_status=${PIPESTATUS[0]}
|
||||
|
||||
if [ "$publish_status" -eq 0 ]; then
|
||||
break
|
||||
fi
|
||||
|
||||
# Retry once for interruption/runner issues.
|
||||
if [ "$attempt" -lt "$max_attempts" ] && \
|
||||
{ [ "$publish_status" -eq 130 ] || [ "$publish_status" -eq 137 ]; }; then
|
||||
echo "Publish dry-run interrupted (exit ${publish_status}), retrying in 10s..."
|
||||
sleep 10
|
||||
attempt=$((attempt + 1))
|
||||
continue
|
||||
fi
|
||||
|
||||
break
|
||||
done
|
||||
set -e
|
||||
|
||||
if grep -Eiq \
|
||||
"failed to verify manifest|failed to parse manifest|invalid Cargo.toml|error: package .* has no (description|license|repository)" \
|
||||
/tmp/publish-dry-run.log; then
|
||||
echo "Detected real packaging/manifest errors"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# In dry-run mode, non-zero publish status is expected due to
|
||||
# dependency-cascade failures against crates.io index.
|
||||
if [ "$publish_status" -ne 0 ]; then
|
||||
echo "Dry-run publish returned non-zero (${publish_status}) but no real manifest blockers were detected."
|
||||
fi
|
||||
|
||||
echo "Only expected dry-run dependency cascade errors detected (if any)."
|
||||
|
||||
# Show the list of packages published
|
||||
- name: Show package versions
|
||||
|
||||
@@ -17,6 +17,8 @@ on:
|
||||
jobs:
|
||||
publish:
|
||||
runs-on: arc-linux-latest
|
||||
env:
|
||||
RUSTUP_PERMIT_COPY_RENAME: 1
|
||||
steps:
|
||||
- name: Checkout repo
|
||||
uses: actions/checkout@v6
|
||||
|
||||
@@ -17,6 +17,8 @@ on:
|
||||
jobs:
|
||||
publish:
|
||||
runs-on: arc-linux-latest
|
||||
env:
|
||||
RUSTUP_PERMIT_COPY_RENAME: 1
|
||||
steps:
|
||||
- name: Checkout repo
|
||||
uses: actions/checkout@v6
|
||||
|
||||
@@ -15,6 +15,8 @@ env:
|
||||
jobs:
|
||||
version-bump:
|
||||
runs-on: arc-linux-latest
|
||||
env:
|
||||
RUSTUP_PERMIT_COPY_RENAME: 1
|
||||
permissions:
|
||||
contents: write
|
||||
steps:
|
||||
|
||||
@@ -25,6 +25,10 @@ jobs:
|
||||
- name: Install cargo-workspaces
|
||||
run: cargo install cargo-workspaces
|
||||
|
||||
- name: Preflight publish checks
|
||||
run: |
|
||||
python3 tools/internal/check_publish_preflight.py
|
||||
|
||||
- name: Publish remaining crates
|
||||
env:
|
||||
CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
[package]
|
||||
name = "nym-kkt-ciphersuite"
|
||||
description = "Nym KKT ciphersuite"
|
||||
authors.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
|
||||
@@ -48,7 +48,7 @@ pub(crate) fn encrypt_lp_packet(
|
||||
) -> Result<EncryptedLpPacket, LpError> {
|
||||
let mut plaintext = BytesMut::with_capacity(InnerHeader::SIZE + packet.message().len());
|
||||
packet.header().inner.encode(&mut plaintext);
|
||||
packet.message().encode_content(&mut plaintext);
|
||||
packet.message().encode(&mut plaintext);
|
||||
|
||||
let ciphertext = encrypt_data(plaintext.as_ref(), transport)?;
|
||||
|
||||
@@ -67,7 +67,7 @@ pub(crate) fn decrypt_lp_packet(
|
||||
|
||||
let inner_header = InnerHeader::parse(&plaintext)?;
|
||||
let payload = &plaintext[InnerHeader::SIZE..];
|
||||
let message = LpMessage::decode_content(payload, inner_header.message_type)?;
|
||||
let message = LpMessage::decode(payload)?;
|
||||
|
||||
Ok(LpPacket::new(
|
||||
LpHeader {
|
||||
@@ -82,7 +82,7 @@ pub(crate) fn decrypt_lp_packet(
|
||||
mod tests {
|
||||
use crate::LpError;
|
||||
use crate::codec::{decrypt_data, decrypt_lp_packet, encrypt_data, encrypt_lp_packet};
|
||||
use crate::packet::{EncryptedLpPacket, LpHeader, LpMessage, LpPacket, MessageType};
|
||||
use crate::packet::{EncryptedLpPacket, LpHeader, LpMessage, LpPacket};
|
||||
use crate::peer::mock_peers;
|
||||
use crate::psq::initiator::{build_psq_ciphersuite, build_psq_principal};
|
||||
use crate::psq::{PSQ_MSG2_SIZE, psq_msg1_size, responder};
|
||||
@@ -259,7 +259,10 @@ mod tests {
|
||||
let (mut init_transport, mut resp_transport) = mock_transport();
|
||||
|
||||
// happy path
|
||||
let packet = LpPacket::new(LpHeader::new(123, 0, 1, MessageType::Busy), LpMessage::Busy);
|
||||
let packet = LpPacket::new(
|
||||
LpHeader::new(123, 0, 1),
|
||||
LpMessage::new_opaque(b"foomp".to_vec()),
|
||||
);
|
||||
|
||||
let ciphertext = encrypt_lp_packet(packet.clone(), &mut init_transport).unwrap();
|
||||
assert_eq!(packet.header().outer, ciphertext.outer_header());
|
||||
@@ -268,7 +271,10 @@ mod tests {
|
||||
assert_eq!(packet, plaintext);
|
||||
|
||||
// incomplete ciphertext
|
||||
let packet = LpPacket::new(LpHeader::new(123, 1, 1, MessageType::Busy), LpMessage::Busy);
|
||||
let packet = LpPacket::new(
|
||||
LpHeader::new(123, 1, 1),
|
||||
LpMessage::new_opaque(b"foomp".to_vec()),
|
||||
);
|
||||
let ciphertext2 = encrypt_lp_packet(packet, &mut init_transport).unwrap();
|
||||
let l = ciphertext2.ciphertext().len();
|
||||
let malformed_content = ciphertext2.ciphertext()[..l - 1].to_vec();
|
||||
@@ -277,7 +283,10 @@ mod tests {
|
||||
assert!(matches!(dec_err, LpError::PSQSessionFailure { .. }));
|
||||
|
||||
// too small buffer
|
||||
let packet = LpPacket::new(LpHeader::new(123, 1, 1, MessageType::Busy), LpMessage::Busy);
|
||||
let packet = LpPacket::new(
|
||||
LpHeader::new(123, 1, 1),
|
||||
LpMessage::new_opaque(b"foomp".to_vec()),
|
||||
);
|
||||
let ciphertext3 = encrypt_lp_packet(packet, &mut resp_transport).unwrap();
|
||||
let malformed = EncryptedLpPacket::new(ciphertext3.outer_header(), vec![]);
|
||||
let dec_err = decrypt_lp_packet(malformed, &mut init_transport).unwrap_err();
|
||||
|
||||
@@ -11,8 +11,8 @@ pub enum MalformedLpPacketError {
|
||||
#[error("provided insufficient data to fully deserialise the struct")]
|
||||
InsufficientData,
|
||||
|
||||
#[error("{0} is not a valid MessageType")]
|
||||
InvalidMessageType(u32),
|
||||
#[error("{0} is not a valid LpDataKind")]
|
||||
InvalidLpDataKind(u16),
|
||||
|
||||
#[error("invalid payload size: expected {expected}, got {actual}")]
|
||||
InvalidPayloadSize { expected: usize, actual: usize },
|
||||
@@ -27,7 +27,7 @@ pub enum MalformedLpPacketError {
|
||||
}
|
||||
|
||||
impl MalformedLpPacketError {
|
||||
pub fn invalid_message_type(message_type: u32) -> Self {
|
||||
MalformedLpPacketError::InvalidMessageType(message_type)
|
||||
pub fn invalid_data_kind(message_type: u16) -> Self {
|
||||
MalformedLpPacketError::InvalidLpDataKind(message_type)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::packet::message::MessageType;
|
||||
use crate::packet::version;
|
||||
use crate::{packet::error::MalformedLpPacketError, peer_config::LpReceiverIndex};
|
||||
use bytes::{BufMut, BytesMut};
|
||||
// use nym_lp::peer_config::LpReceiverIndex;
|
||||
use tracing::warn;
|
||||
|
||||
/// Outer header (12 bytes) - always cleartext, used for routing.
|
||||
@@ -58,11 +56,10 @@ impl OuterHeader {
|
||||
pub struct InnerHeader {
|
||||
pub protocol_version: u8,
|
||||
pub reserved: [u8; 3],
|
||||
pub message_type: MessageType,
|
||||
}
|
||||
|
||||
impl InnerHeader {
|
||||
pub const SIZE: usize = 8; // protocol_version(1) + reserved(3) + message_type(4)
|
||||
pub const SIZE: usize = 4; // protocol_version(1) + reserved(3)
|
||||
|
||||
pub fn encode(&self, dst: &mut BytesMut) {
|
||||
// protocol version
|
||||
@@ -70,9 +67,6 @@ impl InnerHeader {
|
||||
|
||||
// reserved
|
||||
dst.put_slice(&self.reserved);
|
||||
|
||||
// message type
|
||||
dst.put_slice(&(self.message_type as u32).to_le_bytes());
|
||||
}
|
||||
|
||||
pub fn parse(src: &[u8]) -> Result<Self, MalformedLpPacketError> {
|
||||
@@ -104,14 +98,9 @@ impl InnerHeader {
|
||||
warn!("received non-zero reserved bytes. got: {reserved:?}");
|
||||
}
|
||||
|
||||
let msg_type_raw = u32::from_le_bytes([src[4], src[5], src[6], src[7]]);
|
||||
let message_type = MessageType::from_u32(msg_type_raw)
|
||||
.ok_or_else(|| MalformedLpPacketError::invalid_message_type(msg_type_raw))?;
|
||||
|
||||
Ok(InnerHeader {
|
||||
protocol_version,
|
||||
reserved,
|
||||
message_type,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -129,12 +118,7 @@ pub struct LpHeader {
|
||||
}
|
||||
|
||||
impl LpHeader {
|
||||
pub fn new(
|
||||
receiver_idx: LpReceiverIndex,
|
||||
counter: u64,
|
||||
protocol_version: u8,
|
||||
message_type: MessageType,
|
||||
) -> Self {
|
||||
pub fn new(receiver_idx: LpReceiverIndex, counter: u64, protocol_version: u8) -> Self {
|
||||
Self {
|
||||
outer: OuterHeader {
|
||||
receiver_idx,
|
||||
@@ -143,7 +127,6 @@ impl LpHeader {
|
||||
inner: InnerHeader {
|
||||
protocol_version,
|
||||
reserved: [0u8; 3],
|
||||
message_type,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
+102
-348
@@ -2,109 +2,119 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::packet::error::MalformedLpPacketError;
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use num_enum::{IntoPrimitive, TryFromPrimitive};
|
||||
use std::fmt;
|
||||
use std::fmt::Display;
|
||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, IntoPrimitive, TryFromPrimitive)]
|
||||
#[repr(u32)]
|
||||
pub enum MessageType {
|
||||
/// The party is busy
|
||||
Busy = 0x0000,
|
||||
|
||||
/// Encrypted payload
|
||||
EncryptedData = 0x0001,
|
||||
|
||||
/// Receiver should forward this message via telescoping
|
||||
ForwardPacket = 0x0002,
|
||||
|
||||
/// Receiver index collision - client should retry with new index
|
||||
Collision = 0x0003,
|
||||
|
||||
/// Acknowledgment - gateway confirms receipt of message
|
||||
Ack = 0x0004,
|
||||
|
||||
/// General error
|
||||
Error = 0x00FF,
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct LpMessageHeader {
|
||||
pub kind: LpMessageType,
|
||||
pub message_attributes: [u8; 14],
|
||||
}
|
||||
|
||||
impl MessageType {
|
||||
pub(crate) fn from_u32(value: u32) -> Option<Self> {
|
||||
MessageType::try_from(value).ok()
|
||||
}
|
||||
impl LpMessageHeader {
|
||||
pub const SIZE: usize = 16; // message_kind(2) + message_attributes(14)
|
||||
|
||||
pub fn to_u32(&self) -> u32 {
|
||||
u32::from(*self)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct ApplicationData(pub Vec<u8>);
|
||||
|
||||
impl ApplicationData {
|
||||
pub fn new(bytes: Vec<u8>) -> Self {
|
||||
Self(bytes)
|
||||
}
|
||||
|
||||
fn len(&self) -> usize {
|
||||
self.0.len()
|
||||
}
|
||||
|
||||
fn encode(&self, dst: &mut BytesMut) {
|
||||
dst.put_slice(&self.0);
|
||||
}
|
||||
|
||||
fn decode(bytes: &[u8]) -> Result<Self, MalformedLpPacketError> {
|
||||
Ok(ApplicationData(bytes.to_vec()))
|
||||
}
|
||||
}
|
||||
|
||||
/// General human-readable error message
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct ErrorPacketData {
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
impl ErrorPacketData {
|
||||
pub fn new(message: impl Into<String>) -> Self {
|
||||
ErrorPacketData {
|
||||
message: message.into(),
|
||||
pub fn new(kind: LpMessageType, message_attributes: [u8; 14]) -> Self {
|
||||
Self {
|
||||
kind,
|
||||
message_attributes,
|
||||
}
|
||||
}
|
||||
|
||||
fn len(&self) -> usize {
|
||||
// length-encoding + message
|
||||
4 + self.message.len()
|
||||
}
|
||||
|
||||
fn encode(&self, dst: &mut BytesMut) {
|
||||
dst.put_u32_le(self.message.len() as u32);
|
||||
dst.put_slice(self.message.as_bytes());
|
||||
}
|
||||
|
||||
fn decode(bytes: &[u8]) -> Result<Self, MalformedLpPacketError> {
|
||||
if bytes.len() < 4 {
|
||||
return Err(MalformedLpPacketError::DeserialisationFailure(format!(
|
||||
"Too few bytes to deserialise ErrorPacketData. got {}",
|
||||
bytes.len()
|
||||
)));
|
||||
pub fn new_no_attributes(kind: LpMessageType) -> Self {
|
||||
Self {
|
||||
kind,
|
||||
message_attributes: [0; 14],
|
||||
}
|
||||
|
||||
let message_len = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as usize;
|
||||
if bytes[4..].len() != message_len {
|
||||
return Err(MalformedLpPacketError::DeserialisationFailure(format!(
|
||||
"Wrong number of bytes to deserialise ErrorPacketData. got {}. Expected {}",
|
||||
bytes.len(),
|
||||
4 + message_len
|
||||
)));
|
||||
}
|
||||
|
||||
let message = String::from_utf8_lossy(&bytes[4..]).to_string();
|
||||
|
||||
Ok(ErrorPacketData { message })
|
||||
}
|
||||
|
||||
/// Encode directly into a BytesMut buffer
|
||||
pub fn encode(&self, dst: &mut BytesMut) {
|
||||
dst.put_u16_le(self.kind as u16);
|
||||
dst.put_slice(&self.message_attributes);
|
||||
}
|
||||
|
||||
pub fn parse(src: &[u8]) -> Result<Self, MalformedLpPacketError> {
|
||||
if src.len() < Self::SIZE {
|
||||
return Err(MalformedLpPacketError::InsufficientData);
|
||||
}
|
||||
let raw_kind = u16::from_le_bytes([src[0], src[1]]);
|
||||
|
||||
let kind = LpMessageType::try_from(raw_kind)
|
||||
.map_err(|_| MalformedLpPacketError::invalid_data_kind(raw_kind))?;
|
||||
|
||||
#[allow(clippy::unwrap_used)]
|
||||
let message_attributes = src[2..16].try_into().unwrap();
|
||||
Ok(Self {
|
||||
kind,
|
||||
message_attributes,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Represent application data being sent in Transport mode
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct LpMessage {
|
||||
pub header: LpMessageHeader,
|
||||
pub content: Bytes,
|
||||
}
|
||||
|
||||
impl AsRef<[u8]> for LpMessage {
|
||||
fn as_ref(&self) -> &[u8] {
|
||||
&self.content
|
||||
}
|
||||
}
|
||||
|
||||
impl LpMessage {
|
||||
pub fn new(kind: LpMessageType, content: impl Into<Bytes>) -> Self {
|
||||
Self {
|
||||
header: LpMessageHeader::new_no_attributes(kind),
|
||||
content: content.into(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn encode(&self, dst: &mut BytesMut) {
|
||||
self.header.encode(dst);
|
||||
|
||||
dst.put_slice(&self.content);
|
||||
}
|
||||
|
||||
pub fn decode(src: &[u8]) -> Result<Self, MalformedLpPacketError> {
|
||||
let header = LpMessageHeader::parse(src)?;
|
||||
let content = src[LpMessageHeader::SIZE..].to_vec().into();
|
||||
|
||||
Ok(Self { header, content })
|
||||
}
|
||||
|
||||
pub fn kind(&self) -> LpMessageType {
|
||||
self.header.kind
|
||||
}
|
||||
|
||||
pub fn new_opaque(content: impl Into<Bytes>) -> Self {
|
||||
Self::new(LpMessageType::Opaque, content)
|
||||
}
|
||||
|
||||
pub fn new_registration(data: impl Into<Bytes>) -> Self {
|
||||
Self::new(LpMessageType::Registration, data)
|
||||
}
|
||||
|
||||
pub fn new_forward(data: impl Into<Bytes>) -> Self {
|
||||
Self::new(LpMessageType::Forward, data)
|
||||
}
|
||||
|
||||
pub(crate) fn len(&self) -> usize {
|
||||
LpMessageHeader::SIZE + self.content.len()
|
||||
}
|
||||
}
|
||||
|
||||
/// Represent kind of application data being sent in Transport mode
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Debug, IntoPrimitive, TryFromPrimitive)]
|
||||
#[repr(u16)]
|
||||
pub enum LpMessageType {
|
||||
Opaque = 0,
|
||||
Registration = 1,
|
||||
Forward = 2,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
@@ -163,24 +173,6 @@ impl ForwardPacketData {
|
||||
}
|
||||
}
|
||||
|
||||
fn len(&self) -> usize {
|
||||
// 1 byte length of target lp address type
|
||||
// +
|
||||
// {4,16} target_lp_address IPv{4,6}
|
||||
// +
|
||||
// 2 bytes target_lp_address port
|
||||
// +
|
||||
// 4 bytes for expected response size
|
||||
// +
|
||||
// 4 bytes of length of inner packet bytes
|
||||
// +
|
||||
// inner_packet_bytes.len()
|
||||
match self.target_lp_address {
|
||||
SocketAddr::V4(_) => 1 + 4 + 2 + 4 + 4 + self.inner_packet_bytes.len(),
|
||||
SocketAddr::V6(_) => 1 + 16 + 2 + 4 + 4 + self.inner_packet_bytes.len(),
|
||||
}
|
||||
}
|
||||
|
||||
// 0 || [4B ipv4] || [2B port] || [4B res size] || [4B plen] || payload
|
||||
// 1 || [16B ipv6] || [2B port] || [4B res size] || [4B plen] || payload
|
||||
fn encode(&self, dst: &mut BytesMut) {
|
||||
@@ -261,241 +253,3 @@ impl ForwardPacketData {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum LpMessage {
|
||||
/// The party is busy
|
||||
Busy,
|
||||
|
||||
/// Application payload is being sent
|
||||
ApplicationData(ApplicationData),
|
||||
|
||||
/// Receiver should forward this message via telescoping
|
||||
ForwardPacket(ForwardPacketData),
|
||||
|
||||
/// Receiver index collision - client should retry with new receiver_index
|
||||
Collision,
|
||||
|
||||
/// Acknowledgment - gateway confirms receipt of message
|
||||
Ack,
|
||||
|
||||
/// An error has occurred
|
||||
Error(ErrorPacketData),
|
||||
}
|
||||
|
||||
impl From<ApplicationData> for LpMessage {
|
||||
fn from(value: ApplicationData) -> Self {
|
||||
LpMessage::ApplicationData(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ForwardPacketData> for LpMessage {
|
||||
fn from(value: ForwardPacketData) -> Self {
|
||||
LpMessage::ForwardPacket(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for LpMessage {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
LpMessage::Busy => write!(f, "Busy"),
|
||||
LpMessage::ApplicationData(_) => write!(f, "EncryptedData"),
|
||||
LpMessage::ForwardPacket(_) => write!(f, "ForwardPacket"),
|
||||
LpMessage::Collision => write!(f, "Collision"),
|
||||
LpMessage::Ack => write!(f, "Ack"),
|
||||
LpMessage::Error(_) => write!(f, "Error"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LpMessage {
|
||||
#[deprecated(note = "is it actually needed?")]
|
||||
pub fn payload(&self) -> &[u8] {
|
||||
match self {
|
||||
LpMessage::Busy => &[],
|
||||
LpMessage::ApplicationData(payload) => payload.0.as_slice(),
|
||||
LpMessage::ForwardPacket(_) => &[], // Structured data, serialized in encode_content
|
||||
LpMessage::Collision => &[],
|
||||
LpMessage::Ack => &[],
|
||||
LpMessage::Error(_) => &[], // Structured data, serialized in encode_content (?)
|
||||
}
|
||||
}
|
||||
|
||||
#[deprecated(note = "is it actually needed?")]
|
||||
pub fn is_empty(&self) -> bool {
|
||||
match self {
|
||||
LpMessage::Busy => true,
|
||||
LpMessage::ApplicationData(payload) => payload.0.is_empty(),
|
||||
LpMessage::ForwardPacket(_) => false, // Always has data
|
||||
LpMessage::Collision => true,
|
||||
LpMessage::Ack => true,
|
||||
LpMessage::Error(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
match self {
|
||||
LpMessage::Busy => 0,
|
||||
LpMessage::ApplicationData(payload) => payload.len(),
|
||||
LpMessage::ForwardPacket(payload) => payload.len(),
|
||||
LpMessage::Collision => 0,
|
||||
LpMessage::Ack => 0,
|
||||
LpMessage::Error(payload) => payload.len(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn typ(&self) -> MessageType {
|
||||
match self {
|
||||
LpMessage::Busy => MessageType::Busy,
|
||||
LpMessage::ApplicationData(_) => MessageType::EncryptedData,
|
||||
LpMessage::ForwardPacket(_) => MessageType::ForwardPacket,
|
||||
LpMessage::Collision => MessageType::Collision,
|
||||
LpMessage::Ack => MessageType::Ack,
|
||||
LpMessage::Error(_) => MessageType::Error,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn encode_content(&self, dst: &mut BytesMut) {
|
||||
match self {
|
||||
LpMessage::Busy => { /* No content */ }
|
||||
LpMessage::ApplicationData(payload) => payload.encode(dst),
|
||||
LpMessage::ForwardPacket(data) => data.encode(dst),
|
||||
LpMessage::Collision => { /* No content */ }
|
||||
LpMessage::Ack => { /* No content */ }
|
||||
LpMessage::Error(data) => data.encode(dst),
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse message from its type and content bytes.
|
||||
///
|
||||
/// Used when decrypting outer-encrypted packets where the message type
|
||||
/// was encrypted along with the content.
|
||||
pub fn decode_content(
|
||||
content: &[u8],
|
||||
message_type: MessageType,
|
||||
) -> Result<Self, MalformedLpPacketError> {
|
||||
match message_type {
|
||||
MessageType::Busy => {
|
||||
content.ensure_empty()?;
|
||||
Ok(LpMessage::Busy)
|
||||
}
|
||||
MessageType::EncryptedData => Ok(LpMessage::ApplicationData(ApplicationData::decode(
|
||||
content,
|
||||
)?)),
|
||||
MessageType::ForwardPacket => Ok(LpMessage::ForwardPacket(ForwardPacketData::decode(
|
||||
content,
|
||||
)?)),
|
||||
MessageType::Collision => {
|
||||
content.ensure_empty()?;
|
||||
Ok(LpMessage::Collision)
|
||||
}
|
||||
MessageType::Ack => {
|
||||
content.ensure_empty()?;
|
||||
Ok(LpMessage::Ack)
|
||||
}
|
||||
MessageType::Error => Ok(LpMessage::Error(ErrorPacketData::decode(content)?)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper trait for improving readability to return error if bytes content is not empty
|
||||
trait EnsureEmptyContent {
|
||||
fn ensure_empty(&self) -> Result<(), MalformedLpPacketError>;
|
||||
}
|
||||
|
||||
impl EnsureEmptyContent for &[u8] {
|
||||
fn ensure_empty(&self) -> Result<(), MalformedLpPacketError> {
|
||||
if !self.is_empty() {
|
||||
return Err(MalformedLpPacketError::InvalidPayloadSize {
|
||||
expected: 0,
|
||||
actual: self.len(),
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::packet::{InnerHeader, LpHeader, LpPacket, OuterHeader};
|
||||
|
||||
#[test]
|
||||
fn encoding() {
|
||||
let message = LpMessage::ApplicationData(ApplicationData(vec![11u8; 124]));
|
||||
|
||||
let resp_header = LpHeader {
|
||||
outer: OuterHeader {
|
||||
receiver_idx: 456,
|
||||
counter: 123,
|
||||
},
|
||||
inner: InnerHeader {
|
||||
protocol_version: 1,
|
||||
reserved: [0u8; 3],
|
||||
message_type: MessageType::EncryptedData,
|
||||
},
|
||||
};
|
||||
|
||||
let packet = LpPacket {
|
||||
header: resp_header,
|
||||
message,
|
||||
};
|
||||
|
||||
// Just print packet for debug, will be captured in test output
|
||||
println!("{packet:?}");
|
||||
|
||||
// Verify message type
|
||||
assert!(matches!(packet.message.typ(), MessageType::EncryptedData));
|
||||
|
||||
// Verify correct data in message
|
||||
match &packet.message {
|
||||
LpMessage::ApplicationData(data) => {
|
||||
assert_eq!(*data, ApplicationData(vec![11u8; 124]));
|
||||
}
|
||||
_ => panic!("Wrong message type"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn forward_message_encoding() {
|
||||
let msg1 = ForwardPacketData {
|
||||
target_lp_address: "1.2.3.4:5678".parse().unwrap(),
|
||||
expected_response_size: ExpectedResponseSize::Transport,
|
||||
inner_packet_bytes: vec![],
|
||||
};
|
||||
|
||||
let msg2 = ForwardPacketData {
|
||||
target_lp_address: "1.2.3.4:5678".parse().unwrap(),
|
||||
expected_response_size: ExpectedResponseSize::Handshake(250),
|
||||
inner_packet_bytes: vec![42u8; 64],
|
||||
};
|
||||
|
||||
let msg3 = ForwardPacketData {
|
||||
target_lp_address: "[2001:db8::1]:8080".parse().unwrap(),
|
||||
expected_response_size: ExpectedResponseSize::Transport,
|
||||
inner_packet_bytes: vec![],
|
||||
};
|
||||
|
||||
let msg4 = ForwardPacketData {
|
||||
target_lp_address: "[2001:db8::1]:8080".parse().unwrap(),
|
||||
expected_response_size: ExpectedResponseSize::Handshake(250),
|
||||
inner_packet_bytes: vec![42u8; 64],
|
||||
};
|
||||
|
||||
let b = msg1.to_bytes();
|
||||
let msg1_r = ForwardPacketData::decode(&b).unwrap();
|
||||
assert_eq!(msg1_r, msg1);
|
||||
|
||||
let b = msg2.to_bytes();
|
||||
let msg2_r = ForwardPacketData::decode(&b).unwrap();
|
||||
assert_eq!(msg2_r, msg2);
|
||||
|
||||
let b = msg3.to_bytes();
|
||||
let msg3_r = ForwardPacketData::decode(&b).unwrap();
|
||||
assert_eq!(msg3_r, msg3);
|
||||
|
||||
let b = msg4.to_bytes();
|
||||
let msg4_r = ForwardPacketData::decode(&b).unwrap();
|
||||
assert_eq!(msg4_r, msg4);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ use std::fmt::{Debug, Formatter};
|
||||
|
||||
pub use error::MalformedLpPacketError;
|
||||
pub use header::{InnerHeader, LpHeader, OuterHeader};
|
||||
pub use message::{ApplicationData, ForwardPacketData, LpMessage, MessageType};
|
||||
pub use message::{ForwardPacketData, LpMessage};
|
||||
|
||||
pub mod error;
|
||||
pub mod header;
|
||||
@@ -78,7 +78,7 @@ impl EncryptedLpPacket {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Eq)]
|
||||
#[derive(Clone, PartialEq)]
|
||||
pub struct LpPacket {
|
||||
pub(crate) header: LpHeader,
|
||||
pub(crate) message: LpMessage,
|
||||
@@ -95,10 +95,6 @@ impl LpPacket {
|
||||
Self { header, message }
|
||||
}
|
||||
|
||||
pub fn typ(&self) -> MessageType {
|
||||
self.message.typ()
|
||||
}
|
||||
|
||||
pub fn message(&self) -> &LpMessage {
|
||||
&self.message
|
||||
}
|
||||
@@ -119,8 +115,6 @@ impl LpPacket {
|
||||
|
||||
pub(crate) fn dbg_encode(&self, dst: &mut BytesMut) {
|
||||
self.header.dbg_encode(dst);
|
||||
|
||||
dst.put_slice(&(self.message.typ() as u16).to_le_bytes());
|
||||
self.message.encode_content(dst);
|
||||
self.message.encode(dst)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
//! This module implements session management functionality, including replay protection
|
||||
|
||||
use crate::codec::{decrypt_lp_packet, encrypt_lp_packet};
|
||||
use crate::packet::{ApplicationData, EncryptedLpPacket, LpHeader, LpMessage, LpPacket};
|
||||
use crate::packet::{EncryptedLpPacket, LpHeader, LpMessage, LpPacket};
|
||||
use crate::peer::{LpLocalPeer, LpRemotePeer};
|
||||
use crate::peer_config::LpReceiverIndex;
|
||||
use crate::psq::{
|
||||
@@ -174,12 +174,7 @@ impl LpSession {
|
||||
|
||||
pub fn next_packet(&mut self, message: LpMessage) -> Result<LpPacket, LpError> {
|
||||
let counter = self.next_counter();
|
||||
let header = LpHeader::new(
|
||||
self.receiver_index(),
|
||||
counter,
|
||||
self.protocol_version,
|
||||
message.typ(),
|
||||
);
|
||||
let header = LpHeader::new(self.receiver_index(), counter, self.protocol_version);
|
||||
let packet = LpPacket::new(header, message);
|
||||
Ok(packet)
|
||||
}
|
||||
@@ -255,9 +250,9 @@ impl LpSession {
|
||||
/// * `Err(LpError)` if the session is not in transport mode or encryption fails.
|
||||
pub(crate) fn encrypt_application_data(
|
||||
&mut self,
|
||||
data: Vec<u8>,
|
||||
data: LpMessage,
|
||||
) -> Result<EncryptedLpPacket, LpError> {
|
||||
let packet = self.next_packet(LpMessage::ApplicationData(ApplicationData::new(data)))?;
|
||||
let packet = self.next_packet(data)?;
|
||||
encrypt_lp_packet(packet, &mut self.active_transport)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::packet::EncryptedLpPacket;
|
||||
use crate::state_machine::{LpAction, LpData, LpInput, LpStateBare};
|
||||
use crate::packet::{EncryptedLpPacket, LpMessage};
|
||||
use crate::state_machine::{LpAction, LpInput, LpStateBare};
|
||||
use crate::{LpError, SessionManager, SessionsMock};
|
||||
use nym_kkt_ciphersuite::{IntoEnumIterator, KEM};
|
||||
|
||||
@@ -9,7 +9,7 @@ mod tests {
|
||||
trait ActionExtract {
|
||||
fn ciphertext(self) -> EncryptedLpPacket;
|
||||
|
||||
fn data(self) -> LpData;
|
||||
fn data(self) -> LpMessage;
|
||||
}
|
||||
|
||||
impl ActionExtract for LpAction {
|
||||
@@ -21,7 +21,7 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn data(self) -> LpData {
|
||||
fn data(self) -> LpMessage {
|
||||
if let LpAction::DeliverData(data) = self {
|
||||
data
|
||||
} else {
|
||||
@@ -54,7 +54,7 @@ mod tests {
|
||||
// --- A sends to B ---
|
||||
let plaintext_a = format!("A->B Message {i}").into_bytes();
|
||||
let ciphertext_a = session_manager_1
|
||||
.send_data(peer_a_sm, LpData::new_opaque(plaintext_a.clone()))
|
||||
.send_data(peer_a_sm, LpMessage::new_opaque(plaintext_a.clone()))
|
||||
.unwrap()
|
||||
.ciphertext();
|
||||
|
||||
@@ -69,7 +69,7 @@ mod tests {
|
||||
// --- B sends to A ---
|
||||
let plaintext_b = format!("B->A Message {i}").into_bytes();
|
||||
let ciphertext_b = session_manager_2
|
||||
.send_data(peer_b_sm, LpData::new_opaque(plaintext_b.clone()))
|
||||
.send_data(peer_b_sm, LpMessage::new_opaque(plaintext_b.clone()))
|
||||
.unwrap()
|
||||
.ciphertext();
|
||||
|
||||
@@ -195,8 +195,10 @@ mod tests {
|
||||
|
||||
// --- 3. Simulate Data Transfer via process_input ---
|
||||
println!("Starting data transfer simulation via process_input...");
|
||||
let plaintext_a_to_b = LpData::new_opaque(b"Hello from A via process_input!".to_vec());
|
||||
let plaintext_b_to_a = LpData::new_opaque(b"Hello from B via process_input!".to_vec());
|
||||
let plaintext_a_to_b =
|
||||
LpMessage::new_opaque(b"Hello from A via process_input!".to_vec());
|
||||
let plaintext_b_to_a =
|
||||
LpMessage::new_opaque(b"Hello from B via process_input!".to_vec());
|
||||
|
||||
// --- A sends to B ---
|
||||
println!(" A sends to B");
|
||||
@@ -272,8 +274,8 @@ mod tests {
|
||||
println!("Testing out-of-order reception via process_input...");
|
||||
|
||||
// A prepares N+1 then N
|
||||
let data_n_plus_1 = LpData::new_opaque(b"Message N+1".to_vec());
|
||||
let data_n = LpData::new_opaque(b"Message N".to_vec());
|
||||
let data_n_plus_1 = LpMessage::new_opaque(b"Message N+1".to_vec());
|
||||
let data_n = LpMessage::new_opaque(b"Message N".to_vec());
|
||||
|
||||
let action_send_n1 = session_manager_1
|
||||
.process_input(session_id, LpInput::SendData(data_n_plus_1.clone()))
|
||||
@@ -344,7 +346,7 @@ mod tests {
|
||||
// Further actions on A fail
|
||||
let send_after_close_a = session_manager_1.process_input(
|
||||
session_id,
|
||||
LpInput::SendData(LpData::new_opaque(b"fail".to_vec())),
|
||||
LpInput::SendData(LpMessage::new_opaque(b"fail".to_vec())),
|
||||
);
|
||||
assert!(send_after_close_a.is_err());
|
||||
assert!(matches!(
|
||||
@@ -366,7 +368,7 @@ mod tests {
|
||||
// Further actions on B fail
|
||||
let send_after_close_b = session_manager_2.process_input(
|
||||
session_id,
|
||||
LpInput::SendData(LpData::new_opaque(b"fail".to_vec())),
|
||||
LpInput::SendData(LpMessage::new_opaque(b"fail".to_vec())),
|
||||
);
|
||||
assert!(send_after_close_b.is_err());
|
||||
assert!(matches!(
|
||||
|
||||
@@ -6,9 +6,9 @@
|
||||
//! This module implements session lifecycle management functionality, handling
|
||||
//! creation, retrieval, and storage of sessions.
|
||||
|
||||
use crate::packet::EncryptedLpPacket;
|
||||
use crate::packet::{EncryptedLpPacket, LpMessage};
|
||||
use crate::peer_config::LpReceiverIndex;
|
||||
use crate::state_machine::{LpAction, LpData, LpInput, LpStateBare};
|
||||
use crate::state_machine::{LpAction, LpInput, LpStateBare};
|
||||
use crate::{LpError, LpSession, LpStateMachine};
|
||||
use std::collections::HashMap;
|
||||
|
||||
@@ -44,7 +44,11 @@ impl SessionManager {
|
||||
self.with_state_machine_mut(lp_id, |sm| sm.process_input(input).transpose())?
|
||||
}
|
||||
|
||||
pub fn send_data(&mut self, lp_id: LpReceiverIndex, data: LpData) -> Result<LpAction, LpError> {
|
||||
pub fn send_data(
|
||||
&mut self,
|
||||
lp_id: LpReceiverIndex,
|
||||
data: LpMessage,
|
||||
) -> Result<LpAction, LpError> {
|
||||
self.process_input(lp_id, LpInput::SendData(data))?
|
||||
.ok_or(LpError::NotInTransport)
|
||||
}
|
||||
|
||||
@@ -5,12 +5,11 @@
|
||||
//! State machine ensures protocol steps execute in correct order. Invalid transitions
|
||||
//! return LpError, preventing protocol violations.
|
||||
|
||||
use crate::packet::{EncryptedLpPacket, LpMessage};
|
||||
use crate::packet::EncryptedLpPacket;
|
||||
use crate::packet::message::LpMessage;
|
||||
use crate::peer_config::LpReceiverIndex;
|
||||
use crate::session::SessionId;
|
||||
use crate::{LpError, session::LpSession};
|
||||
use bytes::{Buf, Bytes};
|
||||
use num_enum::{IntoPrimitive, TryFromPrimitive};
|
||||
use std::mem;
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -58,7 +57,7 @@ pub enum LpInput {
|
||||
ReceivePacket(EncryptedLpPacket),
|
||||
|
||||
/// Application wants to send data (only valid in Transport state).
|
||||
SendData(LpData),
|
||||
SendData(LpMessage),
|
||||
|
||||
/// Close the connection.
|
||||
Close,
|
||||
@@ -71,81 +70,12 @@ pub enum LpAction {
|
||||
SendPacket(EncryptedLpPacket),
|
||||
|
||||
/// Deliver decrypted application data received from the peer.
|
||||
DeliverData(LpData),
|
||||
DeliverData(LpMessage),
|
||||
|
||||
/// Inform the environment that the connection is closed.
|
||||
ConnectionClosed,
|
||||
}
|
||||
|
||||
/// Represent application data being sent in Transport mode
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct LpData {
|
||||
pub kind: LpDataKind,
|
||||
pub content: Bytes,
|
||||
}
|
||||
|
||||
impl AsRef<[u8]> for LpData {
|
||||
fn as_ref(&self) -> &[u8] {
|
||||
&self.content
|
||||
}
|
||||
}
|
||||
|
||||
impl LpData {
|
||||
pub fn new(kind: LpDataKind, content: impl Into<Bytes>) -> Self {
|
||||
Self {
|
||||
kind,
|
||||
content: content.into(),
|
||||
}
|
||||
}
|
||||
pub fn new_opaque(content: impl Into<Bytes>) -> Self {
|
||||
Self::new(LpDataKind::Opaque, content)
|
||||
}
|
||||
|
||||
pub fn new_registration(data: impl Into<Bytes>) -> Self {
|
||||
Self::new(LpDataKind::Registration, data)
|
||||
}
|
||||
|
||||
pub fn new_forward(data: impl Into<Bytes>) -> Self {
|
||||
Self::new(LpDataKind::Forward, data)
|
||||
}
|
||||
|
||||
pub fn to_vec(self) -> Vec<u8> {
|
||||
self.into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LpData> for Vec<u8> {
|
||||
fn from(data: LpData) -> Self {
|
||||
let mut out = Vec::with_capacity(data.content.len() + 1);
|
||||
out.push(data.kind as u8);
|
||||
out.extend_from_slice(data.content.as_ref());
|
||||
out
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Vec<u8>> for LpData {
|
||||
type Error = LpError;
|
||||
|
||||
fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
|
||||
let kind = LpDataKind::try_from(value[0]).map_err(|_| {
|
||||
LpError::DeserializationError(format!("unknown data type: {}", value[0]))
|
||||
})?;
|
||||
let mut content = Bytes::from(value);
|
||||
content.advance(1);
|
||||
|
||||
Ok(LpData::new(kind, content))
|
||||
}
|
||||
}
|
||||
|
||||
/// Represent kind of application data being sent in Transport mode
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Debug, IntoPrimitive, TryFromPrimitive)]
|
||||
#[repr(u8)]
|
||||
pub enum LpDataKind {
|
||||
Opaque = 0,
|
||||
Registration = 1,
|
||||
Forward = 2,
|
||||
}
|
||||
|
||||
/// The Lewes Protocol State Machine.
|
||||
pub struct LpStateMachine {
|
||||
pub state: LpState,
|
||||
@@ -234,31 +164,10 @@ impl LpStateMachine {
|
||||
return (LpState::Transport(state), Some(Err(e)));
|
||||
}
|
||||
|
||||
// Check message type
|
||||
match packet.into_message() {
|
||||
// Normal encrypted data
|
||||
LpMessage::ApplicationData(payload) => {
|
||||
// Deliver data
|
||||
match payload.0.try_into() {
|
||||
Ok(data) => {
|
||||
let result_action = Some(Ok(LpAction::DeliverData(data)));
|
||||
(LpState::Transport(state), result_action)
|
||||
}
|
||||
Err(e) => {
|
||||
let reason = e.to_string();
|
||||
(LpState::Closed { reason }, Some(Err(e)))
|
||||
}
|
||||
}
|
||||
}
|
||||
other => {
|
||||
// Unexpected message type in Transport state
|
||||
let err = LpError::InvalidStateTransition {
|
||||
state: "Transport".to_string(),
|
||||
input: format!("Unexpected message type: {other}"),
|
||||
};
|
||||
(LpState::Transport(state), Some(Err(err)))
|
||||
}
|
||||
}
|
||||
// 4. deliver the message
|
||||
let message = packet.message;
|
||||
let result_action = Some(Ok(LpAction::DeliverData(message)));
|
||||
(LpState::Transport(state), result_action)
|
||||
}
|
||||
LpInput::SendData(data) => {
|
||||
// Encrypt and send application data
|
||||
@@ -339,9 +248,9 @@ impl LpStateMachine {
|
||||
fn prepare_data_packet(
|
||||
&self,
|
||||
session: &mut LpSession,
|
||||
data: LpData,
|
||||
data: LpMessage,
|
||||
) -> Result<EncryptedLpPacket, LpError> {
|
||||
session.encrypt_application_data(data.to_vec())
|
||||
session.encrypt_application_data(data)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -386,7 +295,7 @@ mod tests {
|
||||
|
||||
// --- Transport Phase ---
|
||||
println!("--- Step 1: Initiator sends data ---");
|
||||
let data_to_send_1 = LpData::new_opaque(b"hello responder".to_vec());
|
||||
let data_to_send_1 = LpMessage::new_opaque(b"hello responder".to_vec());
|
||||
let init_actions_4 = initiator.process_input(LpInput::SendData(data_to_send_1.clone()));
|
||||
let data_packet_1 = if let Some(Ok(LpAction::SendPacket(packet))) = init_actions_4 {
|
||||
packet.clone()
|
||||
@@ -405,7 +314,7 @@ mod tests {
|
||||
assert_eq!(resp_data_1, data_to_send_1);
|
||||
|
||||
println!("--- Step 3: Responder sends data ---");
|
||||
let data_to_send_2 = LpData::new_opaque(b"hello initiator".to_vec());
|
||||
let data_to_send_2 = LpMessage::new_opaque(b"hello initiator".to_vec());
|
||||
let resp_actions_6 = responder.process_input(LpInput::SendData(data_to_send_2.clone()));
|
||||
let data_packet_2 = if let Some(Ok(LpAction::SendPacket(packet))) = resp_actions_6 {
|
||||
packet.clone()
|
||||
|
||||
@@ -393,10 +393,11 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_basic_lp_entry_registration() -> anyhow::Result<()> {
|
||||
// nym_test_utils::helpers::setup_test_logger();
|
||||
|
||||
for kem in KEM::iter() {
|
||||
let ciphersuite = Ciphersuite::default().with_kem(kem);
|
||||
|
||||
// nym_test_utils::helpers::setup_test_logger();
|
||||
// initialise random, but deterministic, keys, addresses, etc. for the parties
|
||||
let mut client_rng = u64_seeded_rng_09(0);
|
||||
let mut gateway_rng = u64_seeded_rng_09(1);
|
||||
|
||||
@@ -90,12 +90,6 @@ pub(crate) struct Cli {
|
||||
#[arg(value_delimiter = ',')]
|
||||
pub(crate) agent_key_list: Vec<String>,
|
||||
|
||||
#[clap(long, env = "NODE_STATUS_API_AGENT_REGION_MAP")]
|
||||
pub(crate) agent_region_map: Option<String>,
|
||||
|
||||
#[clap(long, env = "NODE_STATUS_API_REGION_CENTROIDS")]
|
||||
pub(crate) region_centroids: Option<String>,
|
||||
|
||||
#[clap(long, default_value = "120s", env = "AGENT_REQUEST_FRESHNESS")]
|
||||
#[arg(value_parser = parse_duration_humantime)]
|
||||
pub(crate) agent_request_freshness: time::Duration,
|
||||
|
||||
@@ -145,87 +145,6 @@ pub(crate) async fn assign_oldest_testrun(
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn assign_nearest_testrun(
|
||||
conn: &mut DbConnection,
|
||||
agent_lat: f64,
|
||||
agent_lon: f64,
|
||||
) -> anyhow::Result<Option<TestrunAssignment>> {
|
||||
let now = now_utc().unix_timestamp();
|
||||
// We rank queued testruns by distance between agent centroid and gateway coordinates.
|
||||
// Coordinates are read from explorer_pretty_bond.location.{latitude,longitude}.
|
||||
// Missing or malformed gateway coordinates pushes that node into FIFO fallback ordering by created_utc.
|
||||
let returning = sqlx::query!(
|
||||
r#"
|
||||
WITH ranked_queued AS (
|
||||
SELECT
|
||||
t.id,
|
||||
t.gateway_id,
|
||||
t.created_utc,
|
||||
CASE
|
||||
WHEN g.explorer_pretty_bond IS NULL THEN 1e12::double precision
|
||||
WHEN ((g.explorer_pretty_bond::jsonb -> 'location' ->> 'latitude') ~ '^-?[0-9]+(\.[0-9]+)?$')
|
||||
AND ((g.explorer_pretty_bond::jsonb -> 'location' ->> 'longitude') ~ '^-?[0-9]+(\.[0-9]+)?$')
|
||||
AND (((g.explorer_pretty_bond::jsonb -> 'location' ->> 'latitude')::double precision) BETWEEN -90.0 AND 90.0)
|
||||
AND (((g.explorer_pretty_bond::jsonb -> 'location' ->> 'longitude')::double precision) BETWEEN -180.0 AND 180.0)
|
||||
THEN 6371.0 * 2.0 * ASIN(
|
||||
LEAST(1.0, SQRT(
|
||||
POWER(SIN(RADIANS((((g.explorer_pretty_bond::jsonb -> 'location' ->> 'latitude')::double precision) - $2) / 2.0)), 2)
|
||||
+ COS(RADIANS($2)) * COS(RADIANS((g.explorer_pretty_bond::jsonb -> 'location' ->> 'latitude')::double precision))
|
||||
* POWER(SIN(RADIANS((((g.explorer_pretty_bond::jsonb -> 'location' ->> 'longitude')::double precision) - $3) / 2.0)), 2)
|
||||
))
|
||||
)
|
||||
ELSE 1e12::double precision
|
||||
END AS distance
|
||||
FROM testruns t
|
||||
JOIN gateways g ON g.id = t.gateway_id
|
||||
WHERE t.status = $1
|
||||
ORDER BY distance ASC, t.created_utc ASC
|
||||
LIMIT 1
|
||||
FOR UPDATE OF t SKIP LOCKED
|
||||
)
|
||||
UPDATE testruns
|
||||
SET
|
||||
status = $4,
|
||||
last_assigned_utc = $5
|
||||
FROM ranked_queued
|
||||
WHERE testruns.id = ranked_queued.id
|
||||
RETURNING
|
||||
testruns.id,
|
||||
testruns.gateway_id
|
||||
"#,
|
||||
TestRunStatus::Queued as i32,
|
||||
agent_lat,
|
||||
agent_lon,
|
||||
TestRunStatus::InProgress as i32,
|
||||
now,
|
||||
)
|
||||
.fetch_optional(conn.as_mut())
|
||||
.await?;
|
||||
|
||||
if let Some(testrun) = returning {
|
||||
let gw_identity = sqlx::query!(
|
||||
r#"
|
||||
SELECT
|
||||
id,
|
||||
gateway_identity_key
|
||||
FROM gateways
|
||||
WHERE id = $1
|
||||
LIMIT 1"#,
|
||||
testrun.gateway_id
|
||||
)
|
||||
.fetch_one(conn.as_mut())
|
||||
.await?;
|
||||
|
||||
Ok(Some(TestrunAssignment {
|
||||
testrun_id: testrun.id,
|
||||
gateway_identity_key: gw_identity.gateway_identity_key,
|
||||
assigned_at_utc: now,
|
||||
}))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn update_testrun_status(
|
||||
conn: &mut DbConnection,
|
||||
testrun_id: i32,
|
||||
@@ -369,83 +288,3 @@ pub(crate) async fn update_testrun_status_by_gateway(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use serde_json::Value;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Candidate {
|
||||
created_utc: i64,
|
||||
explorer_pretty_bond: Option<String>,
|
||||
}
|
||||
|
||||
fn distance_or_fallback_km(
|
||||
explorer_pretty_bond: Option<&str>,
|
||||
agent_lat: f64,
|
||||
agent_lon: f64,
|
||||
) -> f64 {
|
||||
let Some(raw) = explorer_pretty_bond else {
|
||||
return 1e12;
|
||||
};
|
||||
let Ok(value) = serde_json::from_str::<Value>(raw) else {
|
||||
return 1e12;
|
||||
};
|
||||
let Some(location) = value.get("location") else {
|
||||
return 1e12;
|
||||
};
|
||||
let Some(lat) = location.get("latitude").and_then(Value::as_f64) else {
|
||||
return 1e12;
|
||||
};
|
||||
let Some(lon) = location.get("longitude").and_then(Value::as_f64) else {
|
||||
return 1e12;
|
||||
};
|
||||
|
||||
let dlat = (lat - agent_lat).to_radians();
|
||||
let dlon = (lon - agent_lon).to_radians();
|
||||
let a = (dlat / 2.0).sin().powi(2)
|
||||
+ agent_lat.to_radians().cos() * lat.to_radians().cos() * (dlon / 2.0).sin().powi(2);
|
||||
6371.0 * 2.0 * a.sqrt().asin()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn nearest_assignment_falls_back_behind_valid_geo_when_geo_missing() {
|
||||
let agent_lat = 50.1109;
|
||||
let agent_lon = 8.6821;
|
||||
let mut candidates = vec![
|
||||
Candidate {
|
||||
created_utc: 1,
|
||||
explorer_pretty_bond: None,
|
||||
},
|
||||
Candidate {
|
||||
created_utc: 2,
|
||||
explorer_pretty_bond: Some(
|
||||
r#"{"location":{"latitude":50.1109,"longitude":8.6821}}"#.to_string(),
|
||||
),
|
||||
},
|
||||
Candidate {
|
||||
created_utc: 0,
|
||||
explorer_pretty_bond: None,
|
||||
},
|
||||
];
|
||||
|
||||
candidates.sort_by(|a, b| {
|
||||
let da =
|
||||
distance_or_fallback_km(a.explorer_pretty_bond.as_deref(), agent_lat, agent_lon);
|
||||
let db =
|
||||
distance_or_fallback_km(b.explorer_pretty_bond.as_deref(), agent_lat, agent_lon);
|
||||
da.total_cmp(&db).then(a.created_utc.cmp(&b.created_utc))
|
||||
});
|
||||
|
||||
assert!(
|
||||
candidates[0].explorer_pretty_bond.is_some(),
|
||||
"expected valid-geo candidate first, got {:?}",
|
||||
candidates[0]
|
||||
);
|
||||
|
||||
// Missing geo fallback goes to FIFO order.
|
||||
assert!(candidates[1].explorer_pretty_bond.is_none());
|
||||
assert!(candidates[2].explorer_pretty_bond.is_none());
|
||||
assert!(candidates[1].created_utc < candidates[2].created_utc);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,21 +58,7 @@ async fn request_testrun(
|
||||
return Err(HttpError::no_testruns_available());
|
||||
}
|
||||
|
||||
let assignment_result = if let Some((region, centroid)) =
|
||||
state.agent_region_and_centroid(&request.payload.agent_public_key)
|
||||
{
|
||||
tracing::debug!(
|
||||
"Resolved agent region '{region}' (lat={}, lon={}), assigning nearest testrun",
|
||||
centroid.lat,
|
||||
centroid.lon
|
||||
);
|
||||
db::queries::testruns::assign_nearest_testrun(&mut conn, centroid.lat, centroid.lon).await
|
||||
} else {
|
||||
tracing::debug!("Agent region not configured, falling back to FIFO assignment");
|
||||
db::queries::testruns::assign_oldest_testrun(&mut conn).await
|
||||
};
|
||||
|
||||
match assignment_result {
|
||||
match db::queries::testruns::assign_oldest_testrun(&mut conn).await {
|
||||
Ok(res) => {
|
||||
let Some(assignment) = res else {
|
||||
tracing::debug!("No testruns available");
|
||||
|
||||
@@ -8,7 +8,7 @@ use axum::Router;
|
||||
use core::net::SocketAddr;
|
||||
use nym_crypto::asymmetric::ed25519::PublicKey;
|
||||
use nym_task::ShutdownTracker;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use std::sync::Arc;
|
||||
use tokio::{net::TcpListener, sync::RwLock};
|
||||
|
||||
/// Return handles that allow for graceful shutdown of server + awaiting its
|
||||
@@ -19,8 +19,6 @@ pub(crate) async fn start_http_api(
|
||||
http_port: u16,
|
||||
nym_http_cache_ttl: u64,
|
||||
agent_key_list: Vec<PublicKey>,
|
||||
agent_region_map: HashMap<PublicKey, String>,
|
||||
region_centroids: HashMap<String, crate::http::state::RegionCentroid>,
|
||||
agent_max_count: i64,
|
||||
agent_request_freshness_requirement: time::Duration,
|
||||
node_geocache: NodeGeoCache,
|
||||
@@ -34,8 +32,6 @@ pub(crate) async fn start_http_api(
|
||||
db_pool,
|
||||
nym_http_cache_ttl,
|
||||
agent_key_list,
|
||||
agent_region_map,
|
||||
region_centroids,
|
||||
agent_max_count,
|
||||
agent_request_freshness_requirement,
|
||||
node_geocache,
|
||||
|
||||
@@ -36,8 +36,6 @@ pub(crate) struct AppState {
|
||||
db_pool: DbPool,
|
||||
cache: HttpCache,
|
||||
agent_key_list: Vec<PublicKey>,
|
||||
agent_region_map: HashMap<PublicKey, String>,
|
||||
region_centroids: HashMap<String, RegionCentroid>,
|
||||
agent_max_count: i64,
|
||||
agent_request_freshness_requirement: time::Duration,
|
||||
node_geocache: NodeGeoCache,
|
||||
@@ -52,8 +50,6 @@ impl AppState {
|
||||
db_pool: DbPool,
|
||||
cache_ttl: u64,
|
||||
agent_key_list: Vec<PublicKey>,
|
||||
agent_region_map: HashMap<PublicKey, String>,
|
||||
region_centroids: HashMap<String, RegionCentroid>,
|
||||
agent_max_count: i64,
|
||||
agent_request_freshness_requirement: time::Duration,
|
||||
node_geocache: NodeGeoCache,
|
||||
@@ -64,8 +60,6 @@ impl AppState {
|
||||
db_pool,
|
||||
cache: HttpCache::new(cache_ttl).await,
|
||||
agent_key_list,
|
||||
agent_region_map,
|
||||
region_centroids,
|
||||
agent_max_count,
|
||||
agent_request_freshness_requirement,
|
||||
node_geocache,
|
||||
@@ -91,15 +85,6 @@ impl AppState {
|
||||
self.agent_max_count
|
||||
}
|
||||
|
||||
pub(crate) fn agent_region_and_centroid(
|
||||
&self,
|
||||
agent_pubkey: &PublicKey,
|
||||
) -> Option<(&str, RegionCentroid)> {
|
||||
let region = self.agent_region_map.get(agent_pubkey)?;
|
||||
let centroid = self.region_centroids.get(region)?;
|
||||
Some((region.as_str(), *centroid))
|
||||
}
|
||||
|
||||
pub(crate) fn node_geocache(&self) -> NodeGeoCache {
|
||||
self.node_geocache.clone()
|
||||
}
|
||||
@@ -166,12 +151,6 @@ impl AppState {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub(crate) struct RegionCentroid {
|
||||
pub(crate) lat: f64,
|
||||
pub(crate) lon: f64,
|
||||
}
|
||||
|
||||
static GATEWAYS_LIST_KEY: &str = "gateways";
|
||||
static DVPN_GATEWAYS_LIST_KEY: &str = "dvpn_gateways";
|
||||
static DVPN_EXIT_GATEWAY_IPS: &str = "dvpn_exit_gateway_ips";
|
||||
|
||||
@@ -10,7 +10,7 @@ use nym_crypto::asymmetric::ed25519::PublicKey;
|
||||
use nym_network_defaults::setup_env;
|
||||
use nym_task::ShutdownManager;
|
||||
use nym_validator_client::nyxd::NyxdClient;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use std::sync::Arc;
|
||||
|
||||
mod cli;
|
||||
mod db;
|
||||
@@ -41,13 +41,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
.map(|value| PublicKey::from_base58_string(value.trim()).map_err(anyhow::Error::from))
|
||||
.collect::<anyhow::Result<Vec<_>>>()?;
|
||||
tracing::info!("Registered {} agent keys", agent_key_list.len());
|
||||
let agent_region_map = parse_agent_region_map(args.agent_region_map.as_deref())?;
|
||||
let region_centroids = parse_region_centroids(args.region_centroids.as_deref())?;
|
||||
tracing::info!(
|
||||
"Configured {} agent region mappings and {} region centroids",
|
||||
agent_region_map.len(),
|
||||
region_centroids.len()
|
||||
);
|
||||
|
||||
let connection_url = args.database_url.clone();
|
||||
if std::env::var("SHOW_CONFIG").ok().is_some() {
|
||||
@@ -193,8 +186,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
args.http_port,
|
||||
args.nym_http_cache_ttl,
|
||||
agent_key_list.to_owned(),
|
||||
agent_region_map,
|
||||
region_centroids,
|
||||
args.max_agent_count,
|
||||
args.agent_request_freshness,
|
||||
geocache,
|
||||
@@ -211,133 +202,3 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn parse_agent_region_map(raw: Option<&str>) -> anyhow::Result<HashMap<PublicKey, String>> {
|
||||
let mut out = HashMap::new();
|
||||
let Some(raw) = raw else {
|
||||
return Ok(out);
|
||||
};
|
||||
|
||||
for entry in raw.split(',').map(str::trim).filter(|s| !s.is_empty()) {
|
||||
let (pubkey_raw, region_raw) = entry.split_once('=').ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
"malformed NODE_STATUS_API_AGENT_REGION_MAP entry '{entry}', expected '<pubkey>=<region>'"
|
||||
)
|
||||
})?;
|
||||
let pubkey =
|
||||
PublicKey::from_base58_string(pubkey_raw.trim()).map_err(anyhow::Error::from)?;
|
||||
let region = region_raw.trim();
|
||||
if region.is_empty() {
|
||||
anyhow::bail!("empty region in NODE_STATUS_API_AGENT_REGION_MAP entry '{entry}'");
|
||||
}
|
||||
out.insert(pubkey, region.to_string());
|
||||
}
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
fn parse_region_centroids(
|
||||
raw: Option<&str>,
|
||||
) -> anyhow::Result<HashMap<String, http::state::RegionCentroid>> {
|
||||
let mut out = HashMap::new();
|
||||
let Some(raw) = raw else {
|
||||
return Ok(out);
|
||||
};
|
||||
|
||||
for entry in raw.split(',').map(str::trim).filter(|s| !s.is_empty()) {
|
||||
let (region_raw, lat_lon_raw) = entry.split_once('=').ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
"malformed NODE_STATUS_API_REGION_CENTROIDS entry '{entry}', expected '<region>=<lat>:<lon>'"
|
||||
)
|
||||
})?;
|
||||
let region = region_raw.trim();
|
||||
let (lat_raw, lon_raw) = lat_lon_raw.split_once(':').ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
"malformed NODE_STATUS_API_REGION_CENTROIDS entry '{entry}', expected '<region>=<lat>:<lon>'"
|
||||
)
|
||||
})?;
|
||||
let lat = lat_raw.trim().parse::<f64>().map_err(|err| {
|
||||
anyhow::anyhow!(
|
||||
"invalid latitude '{}' in entry '{}': {err}",
|
||||
lat_raw.trim(),
|
||||
entry
|
||||
)
|
||||
})?;
|
||||
let lon = lon_raw.trim().parse::<f64>().map_err(|err| {
|
||||
anyhow::anyhow!(
|
||||
"invalid longitude '{}' in entry '{}': {err}",
|
||||
lon_raw.trim(),
|
||||
entry
|
||||
)
|
||||
})?;
|
||||
|
||||
if region.is_empty() {
|
||||
anyhow::bail!("empty region in NODE_STATUS_API_REGION_CENTROIDS entry '{entry}'");
|
||||
}
|
||||
|
||||
out.insert(region.to_string(), http::state::RegionCentroid { lat, lon });
|
||||
}
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn parses_agent_region_map() {
|
||||
let pubkey_a = nym_crypto::asymmetric::ed25519::PublicKey::from_bytes(&[1; 32])
|
||||
.expect("failed to create test public key A")
|
||||
.to_base58_string();
|
||||
let pubkey_b = nym_crypto::asymmetric::ed25519::PublicKey::from_bytes(&[2; 32])
|
||||
.expect("failed to create test public key B")
|
||||
.to_base58_string();
|
||||
let raw = format!("{pubkey_a}=eu-west,{pubkey_b}=asia-tokyo");
|
||||
|
||||
let parsed = parse_agent_region_map(Some(&raw)).expect("failed to parse map");
|
||||
|
||||
assert_eq!(parsed.len(), 2);
|
||||
let key_a = PublicKey::from_base58_string(&pubkey_a).expect("failed to decode key A");
|
||||
let key_b = PublicKey::from_base58_string(&pubkey_b).expect("failed to decode key B");
|
||||
assert_eq!(parsed.get(&key_a).map(String::as_str), Some("eu-west"));
|
||||
assert_eq!(parsed.get(&key_b).map(String::as_str), Some("asia-tokyo"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn malformed_agent_region_map_entry_returns_error() {
|
||||
let err = parse_agent_region_map(Some("this_is_not_valid")).expect_err("expected error");
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("malformed NODE_STATUS_API_AGENT_REGION_MAP entry"),
|
||||
"unexpected error: {err}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parses_region_centroids() {
|
||||
let raw = "eu-west=50.1109:8.6821,asia-tokyo=35.6762:139.6503";
|
||||
|
||||
let parsed = parse_region_centroids(Some(raw)).expect("failed to parse centroids");
|
||||
|
||||
assert_eq!(parsed.len(), 2);
|
||||
let eu = parsed.get("eu-west").expect("missing eu-west centroid");
|
||||
let asia = parsed
|
||||
.get("asia-tokyo")
|
||||
.expect("missing asia-tokyo centroid");
|
||||
assert!((eu.lat - 50.1109).abs() < 1e-9);
|
||||
assert!((eu.lon - 8.6821).abs() < 1e-9);
|
||||
assert!((asia.lat - 35.6762).abs() < 1e-9);
|
||||
assert!((asia.lon - 139.6503).abs() < 1e-9);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn malformed_region_centroids_entry_returns_error() {
|
||||
let err = parse_region_centroids(Some("eu-west=50.1|8.6")).expect_err("expected error");
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("malformed NODE_STATUS_API_REGION_CENTROIDS entry"),
|
||||
"unexpected error: {err}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,8 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::node::lp::LpReceiverIndex;
|
||||
use nym_lp::state_machine::{LpAction, LpDataKind};
|
||||
use nym_lp::packet::message::LpMessageType;
|
||||
use nym_lp::state_machine::LpAction;
|
||||
use nym_lp::transport::LpTransportError;
|
||||
use nym_lp::{LpError, packet::MalformedLpPacketError};
|
||||
use std::net::SocketAddr;
|
||||
@@ -44,7 +45,7 @@ pub enum LpHandlerError {
|
||||
MalformedLpPacket(#[from] MalformedLpPacketError),
|
||||
|
||||
#[error("received payload type of an unexpected type: {typ:?}")]
|
||||
UnexpectedLpPayload { typ: LpDataKind },
|
||||
UnexpectedLpPayload { typ: LpMessageType },
|
||||
|
||||
#[error("timed out while attempting to send to/receive from the connection")]
|
||||
ConnectionTimeout,
|
||||
|
||||
@@ -4,8 +4,9 @@
|
||||
use super::{LpHandlerState, LpReceiverIndex, TimestampedState};
|
||||
use crate::node::lp::error::LpHandlerError;
|
||||
use dashmap::mapref::one::RefMut;
|
||||
use nym_lp::packet::{EncryptedLpPacket, ForwardPacketData};
|
||||
use nym_lp::state_machine::{LpAction, LpData, LpDataKind, LpInput};
|
||||
use nym_lp::packet::message::LpMessageType;
|
||||
use nym_lp::packet::{EncryptedLpPacket, ForwardPacketData, LpMessage};
|
||||
use nym_lp::state_machine::{LpAction, LpInput};
|
||||
use nym_lp::transport::LpHandshakeChannel;
|
||||
use nym_lp::transport::traits::LpTransportChannel;
|
||||
use nym_lp::{LpSession, LpStateMachine, packet::message::ExpectedResponseSize};
|
||||
@@ -301,13 +302,14 @@ where
|
||||
async fn handle_decrypted_payload(
|
||||
&mut self,
|
||||
receiver_idx: LpReceiverIndex,
|
||||
decrypted_data: LpData,
|
||||
decrypted_data: LpMessage,
|
||||
) -> Result<(), LpHandlerError> {
|
||||
let remote = self.remote_addr;
|
||||
|
||||
let header = decrypted_data.header;
|
||||
let bytes = decrypted_data.content;
|
||||
match decrypted_data.kind {
|
||||
LpDataKind::Registration => {
|
||||
match header.kind {
|
||||
LpMessageType::Registration => {
|
||||
let request = LpRegistrationRequest::try_deserialise(&bytes)
|
||||
.map_err(|source| LpHandlerError::MalformedRegistrationRequest { source })?;
|
||||
|
||||
@@ -319,13 +321,13 @@ where
|
||||
self.handle_registration_request(receiver_idx, request)
|
||||
.await
|
||||
}
|
||||
LpDataKind::Forward => {
|
||||
LpMessageType::Forward => {
|
||||
let forward_data = ForwardPacketData::decode(&bytes)?;
|
||||
|
||||
self.handle_forwarding_request(receiver_idx, forward_data)
|
||||
.await
|
||||
}
|
||||
typ @ LpDataKind::Opaque => {
|
||||
typ @ LpMessageType::Opaque => {
|
||||
// Neither registration nor forwarding - unknown payload type
|
||||
warn!(
|
||||
"Unknown transport payload type from {remote} (receiver_idx={receiver_idx}). dropping {} bytes",
|
||||
@@ -341,14 +343,14 @@ where
|
||||
async fn send_response_packet(
|
||||
&mut self,
|
||||
serialised_response: Vec<u8>,
|
||||
response_kind: LpDataKind,
|
||||
response_kind: LpMessageType,
|
||||
) -> Result<(), LpHandlerError> {
|
||||
let mut state_entry = self.state_entry_mut()?;
|
||||
|
||||
// Access session via state machine for subsession support
|
||||
let state_machine = &mut state_entry.value_mut().state;
|
||||
|
||||
let wrapped_lp_data = LpData::new(response_kind, serialised_response);
|
||||
let wrapped_lp_data = LpMessage::new(response_kind, serialised_response);
|
||||
|
||||
// Process packet through state machine
|
||||
let action = state_machine
|
||||
@@ -378,7 +380,7 @@ where
|
||||
.serialise()
|
||||
.map_err(|source| LpHandlerError::MalformedRegistrationRequest { source })?;
|
||||
|
||||
self.send_response_packet(response_bytes, LpDataKind::Registration)
|
||||
self.send_response_packet(response_bytes, LpMessageType::Registration)
|
||||
.await?;
|
||||
|
||||
match response.status {
|
||||
@@ -415,7 +417,7 @@ where
|
||||
// Forward the packet to the target gateway and retrieve its response
|
||||
let response_bytes = self.handle_forward_packet(forward_data).await?;
|
||||
|
||||
self.send_response_packet(response_bytes, LpDataKind::Forward)
|
||||
self.send_response_packet(response_bytes, LpMessageType::Forward)
|
||||
.await?;
|
||||
|
||||
debug!(
|
||||
@@ -750,7 +752,7 @@ mod tests {
|
||||
|
||||
// Send a valid packet from client side
|
||||
let LpAction::SendPacket(packet) = init_sm
|
||||
.send_data(id, LpData::new_opaque(b"foomp".to_vec()))
|
||||
.send_data(id, LpMessage::new_opaque(b"foomp".to_vec()))
|
||||
.unwrap()
|
||||
else {
|
||||
panic!("illegal state")
|
||||
@@ -786,7 +788,7 @@ mod tests {
|
||||
let (mut stream, _) = listener.accept().await.unwrap();
|
||||
|
||||
let LpAction::SendPacket(packet) = resp_sm
|
||||
.send_data(id, LpData::new_opaque(b"foomp".to_vec()))
|
||||
.send_data(id, LpMessage::new_opaque(b"foomp".to_vec()))
|
||||
.unwrap()
|
||||
else {
|
||||
panic!("illegal state")
|
||||
|
||||
@@ -5,7 +5,8 @@
|
||||
|
||||
use nym_lp::LpError;
|
||||
use nym_lp::packet::MalformedLpPacketError;
|
||||
use nym_lp::state_machine::{LpAction, LpDataKind};
|
||||
use nym_lp::packet::message::LpMessageType;
|
||||
use nym_lp::state_machine::LpAction;
|
||||
use nym_lp::transport::LpTransportError;
|
||||
use thiserror::Error;
|
||||
|
||||
@@ -45,7 +46,7 @@ pub enum LpClientError {
|
||||
MalformedLpPacket(#[from] MalformedLpPacketError),
|
||||
|
||||
#[error("received payload type of an unexpected type: {typ:?}")]
|
||||
UnexpectedLpPayload { typ: LpDataKind },
|
||||
UnexpectedLpPayload { typ: LpMessageType },
|
||||
|
||||
#[error("timed out while attempting to finish the KKT/PSQ handshake")]
|
||||
HandshakeTimeout,
|
||||
|
||||
@@ -4,23 +4,24 @@
|
||||
#![allow(dead_code)]
|
||||
|
||||
use crate::LpClientError;
|
||||
use nym_lp::packet::ForwardPacketData;
|
||||
use nym_lp::packet::message::LpMessageType;
|
||||
use nym_lp::packet::{ForwardPacketData, LpMessage};
|
||||
use nym_lp::peer::LpRemotePeer;
|
||||
use nym_lp::state_machine::{LpAction, LpData, LpDataKind, LpInput};
|
||||
use nym_lp::state_machine::{LpAction, LpInput};
|
||||
use nym_registration_common::{
|
||||
LpRegistrationRequest, LpRegistrationResponse, NymNodeLPInformation,
|
||||
};
|
||||
|
||||
pub(crate) trait LpDataSendExt {
|
||||
fn to_lp_data(&self) -> Result<LpData, LpClientError>;
|
||||
fn to_lp_data(&self) -> Result<LpMessage, LpClientError>;
|
||||
}
|
||||
|
||||
pub(crate) trait LpDataDeliverExt: Sized {
|
||||
fn from_lp_data(data: LpData) -> Result<Self, LpClientError>;
|
||||
fn from_lp_data(data: LpMessage) -> Result<Self, LpClientError>;
|
||||
}
|
||||
|
||||
impl LpDataSendExt for LpRegistrationRequest {
|
||||
fn to_lp_data(&self) -> Result<LpData, LpClientError> {
|
||||
fn to_lp_data(&self) -> Result<LpMessage, LpClientError> {
|
||||
let request_bytes = self.serialise().map_err(|e| {
|
||||
LpClientError::SendRegistrationRequest(format!("Failed to serialize request: {e}"))
|
||||
})?;
|
||||
@@ -30,14 +31,14 @@ impl LpDataSendExt for LpRegistrationRequest {
|
||||
request_bytes.len()
|
||||
);
|
||||
|
||||
Ok(LpData::new_registration(request_bytes))
|
||||
Ok(LpMessage::new_registration(request_bytes))
|
||||
}
|
||||
}
|
||||
|
||||
impl LpDataDeliverExt for LpRegistrationResponse {
|
||||
fn from_lp_data(data: LpData) -> Result<Self, LpClientError> {
|
||||
if data.kind != LpDataKind::Registration {
|
||||
return Err(LpClientError::UnexpectedLpPayload { typ: data.kind });
|
||||
fn from_lp_data(data: LpMessage) -> Result<Self, LpClientError> {
|
||||
if data.kind() != LpMessageType::Registration {
|
||||
return Err(LpClientError::UnexpectedLpPayload { typ: data.kind() });
|
||||
}
|
||||
|
||||
let response = LpRegistrationResponse::try_deserialise(&data.content)
|
||||
@@ -48,7 +49,7 @@ impl LpDataDeliverExt for LpRegistrationResponse {
|
||||
}
|
||||
|
||||
impl LpDataSendExt for ForwardPacketData {
|
||||
fn to_lp_data(&self) -> Result<LpData, LpClientError> {
|
||||
fn to_lp_data(&self) -> Result<LpMessage, LpClientError> {
|
||||
let request_bytes = self.to_bytes();
|
||||
|
||||
tracing::trace!(
|
||||
@@ -56,7 +57,7 @@ impl LpDataSendExt for ForwardPacketData {
|
||||
request_bytes.len()
|
||||
);
|
||||
|
||||
Ok(LpData::new_forward(request_bytes))
|
||||
Ok(LpMessage::new_forward(request_bytes))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,9 +71,9 @@ pub(crate) fn try_convert_forward_response(action: LpAction) -> Result<Vec<u8>,
|
||||
action => return Err(LpClientError::UnexpectedStateMachineAction { action }),
|
||||
};
|
||||
|
||||
if response_data.kind != LpDataKind::Forward {
|
||||
if response_data.kind() != LpMessageType::Forward {
|
||||
return Err(LpClientError::UnexpectedLpPayload {
|
||||
typ: response_data.kind,
|
||||
typ: response_data.kind(),
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -25,10 +25,10 @@ use crate::lp_client::state_machine_helpers::{extract_forwarded_response, prepar
|
||||
use nym_bandwidth_controller::{BandwidthTicketProvider, DEFAULT_TICKETS_TO_SPEND};
|
||||
use nym_credentials_interface::TicketType;
|
||||
use nym_crypto::asymmetric::{ed25519, x25519};
|
||||
use nym_lp::packet::EncryptedLpPacket;
|
||||
use nym_lp::packet::version;
|
||||
use nym_lp::packet::{EncryptedLpPacket, LpMessage};
|
||||
use nym_lp::peer::{DHKeyPair, LpLocalPeer, LpRemotePeer};
|
||||
use nym_lp::state_machine::{LpData, LpStateMachine};
|
||||
use nym_lp::state_machine::LpStateMachine;
|
||||
use nym_lp::transport::LpHandshakeChannel;
|
||||
use nym_lp::transport::traits::LpTransportChannel;
|
||||
use nym_lp::{Ciphersuite, KEM, LpSession};
|
||||
@@ -128,14 +128,17 @@ impl NestedLpSession {
|
||||
|
||||
/// Attempt to wrap the provided `LpData` into a `EncryptedLpPacket`
|
||||
/// using the inner state machine.
|
||||
fn prepare_transport_packet(&mut self, data: LpData) -> Result<EncryptedLpPacket> {
|
||||
fn prepare_transport_packet(&mut self, data: LpMessage) -> Result<EncryptedLpPacket> {
|
||||
let state_machine = self.state_machine_mut()?;
|
||||
prepare_send_packet(data, state_machine)
|
||||
}
|
||||
|
||||
/// Attempt to recover received `LpData` from the received `EncryptedLpPacket`
|
||||
/// using the inner state machine.
|
||||
fn extract_forwarded_response(&mut self, response_packet: EncryptedLpPacket) -> Result<LpData> {
|
||||
fn extract_forwarded_response(
|
||||
&mut self,
|
||||
response_packet: EncryptedLpPacket,
|
||||
) -> Result<LpMessage> {
|
||||
let state_machine = self.state_machine_mut()?;
|
||||
extract_forwarded_response(response_packet, state_machine)
|
||||
}
|
||||
|
||||
@@ -2,13 +2,14 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::LpClientError;
|
||||
use nym_lp::state_machine::{LpAction, LpData, LpInput};
|
||||
use nym_lp::packet::LpMessage;
|
||||
use nym_lp::state_machine::{LpAction, LpInput};
|
||||
use nym_lp::{LpStateMachine, packet::EncryptedLpPacket};
|
||||
|
||||
/// Attempt to prepare the provided data for sending by wrapping it in appropriate `LpAction`,
|
||||
/// and attempting to extract `EncryptedLpPacket` from the provided state machine.
|
||||
pub(crate) fn prepare_send_packet(
|
||||
data: LpData,
|
||||
data: LpMessage,
|
||||
state_machine: &mut LpStateMachine,
|
||||
) -> Result<EncryptedLpPacket, LpClientError> {
|
||||
let action = state_machine
|
||||
@@ -26,7 +27,7 @@ pub(crate) fn prepare_send_packet(
|
||||
pub(crate) fn extract_forwarded_response(
|
||||
response_packet: EncryptedLpPacket,
|
||||
state_machine: &mut LpStateMachine,
|
||||
) -> Result<LpData, LpClientError> {
|
||||
) -> Result<LpMessage, LpClientError> {
|
||||
let action = state_machine
|
||||
.process_input(LpInput::ReceivePacket(response_packet))
|
||||
.ok_or(LpClientError::UnexpectedStateMachineHalt)??;
|
||||
|
||||
@@ -0,0 +1,217 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import json
|
||||
import pathlib
|
||||
import subprocess
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
|
||||
|
||||
def dependency_section(dep):
|
||||
kind = dep.get("kind") or "normal"
|
||||
section = {
|
||||
"normal": "dependencies",
|
||||
"dev": "dev-dependencies",
|
||||
"build": "build-dependencies",
|
||||
}.get(kind, f"{kind}-dependencies")
|
||||
target = dep.get("target")
|
||||
if target:
|
||||
return f"target.{target}.{section}"
|
||||
return section
|
||||
|
||||
|
||||
def manifest_member(root, manifest_path):
|
||||
manifest_parent = pathlib.Path(manifest_path).resolve().parent
|
||||
try:
|
||||
return str(manifest_parent.relative_to(root))
|
||||
except ValueError:
|
||||
return str(manifest_parent)
|
||||
|
||||
|
||||
def publish_status(pkg):
|
||||
publish = pkg.get("publish")
|
||||
if publish is None:
|
||||
return True, "publishable to crates.io"
|
||||
|
||||
if isinstance(publish, list):
|
||||
if not publish:
|
||||
return False, "publish disabled (`publish = false`)"
|
||||
if "crates-io" in publish:
|
||||
return True, "publishable to crates.io"
|
||||
registries = ", ".join(publish)
|
||||
return False, f"publish restricted to non-crates.io registries ({registries})"
|
||||
|
||||
return False, f"unrecognized `publish` setting: {publish!r}"
|
||||
|
||||
|
||||
def main():
|
||||
root = pathlib.Path(".").resolve()
|
||||
metadata = json.loads(
|
||||
subprocess.check_output(
|
||||
["cargo", "metadata", "--no-deps", "--format-version", "1"],
|
||||
text=True,
|
||||
)
|
||||
)
|
||||
packages_by_id = {pkg["id"]: pkg for pkg in metadata["packages"]}
|
||||
workspace_ids = set(metadata["workspace_members"])
|
||||
workspace_packages = [
|
||||
packages_by_id[pkg_id] for pkg_id in workspace_ids if pkg_id in packages_by_id
|
||||
]
|
||||
workspace_by_name = {pkg["name"]: pkg for pkg in workspace_packages}
|
||||
workspace_dir_to_name = {
|
||||
str(pathlib.Path(pkg["manifest_path"]).resolve().parent): pkg["name"]
|
||||
for pkg in workspace_packages
|
||||
}
|
||||
|
||||
package_info = {}
|
||||
for pkg in workspace_packages:
|
||||
name = pkg["name"]
|
||||
member = manifest_member(root, pkg["manifest_path"])
|
||||
explicitly_publishable, publish_reason = publish_status(pkg)
|
||||
package_info[name] = {
|
||||
"pkg": pkg,
|
||||
"member": member,
|
||||
"explicitly_publishable": explicitly_publishable,
|
||||
"publish_reason": publish_reason,
|
||||
}
|
||||
|
||||
direct_issues = defaultdict(set)
|
||||
workspace_deps = defaultdict(list)
|
||||
|
||||
for name, info in package_info.items():
|
||||
pkg = info["pkg"]
|
||||
member = info["member"]
|
||||
explicitly_publishable = info["explicitly_publishable"]
|
||||
|
||||
if not explicitly_publishable:
|
||||
direct_issues[name].add(info["publish_reason"])
|
||||
continue
|
||||
|
||||
for field in ("description", "license", "repository"):
|
||||
value = pkg.get(field)
|
||||
if not isinstance(value, str) or not value.strip():
|
||||
direct_issues[name].add(f"missing required field '{field}'")
|
||||
|
||||
for dep in pkg.get("dependencies", []):
|
||||
section = dependency_section(dep)
|
||||
dep_name = dep["name"]
|
||||
dep_source = dep.get("source")
|
||||
|
||||
dep_workspace_name = workspace_by_name.get(dep_name, {}).get("name")
|
||||
dep_path = dep.get("path")
|
||||
if dep_workspace_name is None and dep_path:
|
||||
dep_workspace_name = workspace_dir_to_name.get(
|
||||
str(pathlib.Path(dep_path).resolve())
|
||||
)
|
||||
|
||||
if dep_path and dep.get("req") in ("*", ""):
|
||||
direct_issues[name].add(
|
||||
f"{section}: path dependency '{dep_name}' has no explicit version ({dep_path})"
|
||||
)
|
||||
|
||||
if dep_workspace_name:
|
||||
dep_info = package_info[dep_workspace_name]
|
||||
if not dep_info["explicitly_publishable"]:
|
||||
direct_issues[name].add(
|
||||
f"{section}: depends on non-publishable workspace crate '{dep_workspace_name}' ({dep_info['publish_reason']})"
|
||||
)
|
||||
continue
|
||||
workspace_deps[name].append((dep_workspace_name, section))
|
||||
continue
|
||||
|
||||
if dep_source and not dep_source.startswith("registry+"):
|
||||
direct_issues[name].add(
|
||||
f"{section}: non-registry dependency '{dep_name}' from '{dep_source}'"
|
||||
)
|
||||
|
||||
effective_issues = {}
|
||||
|
||||
def collect_effective_issues(crate_name, stack):
|
||||
cached = effective_issues.get(crate_name)
|
||||
if cached is not None:
|
||||
return cached
|
||||
|
||||
issues = set(direct_issues.get(crate_name, set()))
|
||||
stack = stack | {crate_name}
|
||||
|
||||
for dep_name, dep_section in workspace_deps.get(crate_name, []):
|
||||
dep_info = package_info[dep_name]
|
||||
if not dep_info["explicitly_publishable"]:
|
||||
issues.add(
|
||||
f"{dep_section}: depends on non-publishable workspace crate '{dep_name}' ({dep_info['publish_reason']})"
|
||||
)
|
||||
continue
|
||||
|
||||
if dep_name in stack:
|
||||
continue
|
||||
|
||||
dep_issues = collect_effective_issues(dep_name, stack)
|
||||
if dep_issues:
|
||||
issues.add(
|
||||
f"{dep_section}: depends on blocked workspace crate '{dep_name}'"
|
||||
)
|
||||
|
||||
effective_issues[crate_name] = issues
|
||||
return issues
|
||||
|
||||
for crate_name in package_info:
|
||||
collect_effective_issues(crate_name, set())
|
||||
|
||||
publish_targets = sorted(
|
||||
name for name, info in package_info.items() if info["explicitly_publishable"]
|
||||
)
|
||||
root_blockers = sorted(
|
||||
name
|
||||
for name in publish_targets
|
||||
if direct_issues.get(name)
|
||||
)
|
||||
transitive_blocked = sorted(
|
||||
name
|
||||
for name in publish_targets
|
||||
if not direct_issues.get(name) and effective_issues.get(name)
|
||||
)
|
||||
|
||||
disabled_by_config = sorted(
|
||||
name for name, info in package_info.items() if not info["explicitly_publishable"]
|
||||
)
|
||||
|
||||
print("Publishability preflight report:")
|
||||
print(f"- workspace crates inspected: {len(package_info)}")
|
||||
print(f"- crates configured for crates.io publish: {len(publish_targets)}")
|
||||
print(f"- root blockers (direct issues): {len(root_blockers)}")
|
||||
print(f"- downstream blocked crates (transitive): {len(transitive_blocked)}")
|
||||
print(f"- crates excluded by config (publish = false / restricted): {len(disabled_by_config)}")
|
||||
|
||||
if root_blockers:
|
||||
print("\nAction required: root blockers")
|
||||
for crate_name in root_blockers:
|
||||
info = package_info[crate_name]
|
||||
print(f"- {crate_name} ({info['member']})")
|
||||
for issue in sorted(direct_issues[crate_name]):
|
||||
print(f" - {issue}")
|
||||
|
||||
if transitive_blocked:
|
||||
print("\nDownstream blocked crates")
|
||||
print("- These crates have no direct issue; they are blocked by dependencies listed below.")
|
||||
for crate_name in transitive_blocked:
|
||||
info = package_info[crate_name]
|
||||
blockers = set()
|
||||
for dep_name, dep_section in workspace_deps.get(crate_name, []):
|
||||
dep_info = package_info[dep_name]
|
||||
if not dep_info["explicitly_publishable"] or effective_issues.get(dep_name):
|
||||
blockers.add(f"{dep_name} via {dep_section}")
|
||||
|
||||
print(f"- {crate_name} ({info['member']})")
|
||||
for blocker in sorted(blockers):
|
||||
print(f" - blocked by {blocker}")
|
||||
|
||||
if root_blockers or transitive_blocked:
|
||||
print("\nPreflight checks failed:")
|
||||
print(f"- {len(root_blockers) + len(transitive_blocked)} crate(s) configured for crates.io publish are blocked.")
|
||||
sys.exit(1)
|
||||
|
||||
print("\nPreflight checks passed.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user