Compare commits

..

1 Commits

Author SHA1 Message Date
Bogdan-Ștefan Neacşu 658ac4034a Test wg crate 2023-11-28 19:14:19 +01:00
20 changed files with 251 additions and 1154 deletions
Generated
+75 -7
View File
@@ -2337,6 +2337,24 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "defguard_wireguard_rs"
version = "0.3.0"
dependencies = [
"base64 0.21.4",
"libc",
"log",
"netlink-packet-core 0.7.0",
"netlink-packet-generic",
"netlink-packet-route 0.17.1",
"netlink-packet-utils",
"netlink-packet-wireguard",
"netlink-sys",
"nix 0.27.1",
"serde",
"thiserror",
]
[[package]]
name = "der"
version = "0.6.1"
@@ -5694,6 +5712,29 @@ dependencies = [
"netlink-packet-utils",
]
[[package]]
name = "netlink-packet-core"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72724faf704479d67b388da142b186f916188505e7e0b26719019c525882eda4"
dependencies = [
"anyhow",
"byteorder",
"netlink-packet-utils",
]
[[package]]
name = "netlink-packet-generic"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cd7eb8ad331c84c6b8cb7f685b448133e5ad82e1ffd5acafac374af4a5a308b"
dependencies = [
"anyhow",
"byteorder",
"netlink-packet-core 0.7.0",
"netlink-packet-utils",
]
[[package]]
name = "netlink-packet-route"
version = "0.12.0"
@@ -5704,7 +5745,21 @@ dependencies = [
"bitflags 1.3.2",
"byteorder",
"libc",
"netlink-packet-core",
"netlink-packet-core 0.4.2",
"netlink-packet-utils",
]
[[package]]
name = "netlink-packet-route"
version = "0.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "053998cea5a306971f88580d0829e90f270f940befd7cf928da179d4187a5a66"
dependencies = [
"anyhow",
"bitflags 1.3.2",
"byteorder",
"libc",
"netlink-packet-core 0.7.0",
"netlink-packet-utils",
]
@@ -5720,6 +5775,20 @@ dependencies = [
"thiserror",
]
[[package]]
name = "netlink-packet-wireguard"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60b25b050ff1f6a1e23c6777b72db22790fe5b6b5ccfd3858672587a79876c8f"
dependencies = [
"anyhow",
"byteorder",
"libc",
"log",
"netlink-packet-generic",
"netlink-packet-utils",
]
[[package]]
name = "netlink-proto"
version = "0.10.0"
@@ -5729,7 +5798,7 @@ dependencies = [
"bytes",
"futures",
"log",
"netlink-packet-core",
"netlink-packet-core 0.4.2",
"netlink-sys",
"thiserror",
"tokio",
@@ -5781,6 +5850,7 @@ dependencies = [
"bitflags 2.4.1",
"cfg-if",
"libc",
"memoffset 0.9.0",
]
[[package]]
@@ -6482,6 +6552,7 @@ dependencies = [
"clap 4.4.7",
"colored",
"dashmap",
"defguard_wireguard_rs",
"dirs 4.0.0",
"dotenvy",
"futures",
@@ -6608,9 +6679,7 @@ dependencies = [
"bincode",
"bytes",
"nym-sphinx",
"rand 0.8.5",
"serde",
"thiserror",
]
[[package]]
@@ -6619,7 +6688,6 @@ version = "0.1.0"
dependencies = [
"bincode",
"bytes",
"dashmap",
"etherparse",
"futures",
"log",
@@ -6636,7 +6704,6 @@ dependencies = [
"nym-tun",
"nym-wireguard",
"nym-wireguard-types",
"rand 0.8.5",
"reqwest",
"serde",
"serde_json",
@@ -7561,6 +7628,7 @@ dependencies = [
"boringtun",
"bytes",
"dashmap",
"defguard_wireguard_rs",
"etherparse",
"futures",
"ip_network",
@@ -9224,7 +9292,7 @@ checksum = "322c53fd76a18698f1c27381d58091de3a043d356aa5bd0d510608b565f469a0"
dependencies = [
"futures",
"log",
"netlink-packet-route",
"netlink-packet-route 0.12.0",
"netlink-proto",
"nix 0.24.3",
"thiserror",
@@ -3,7 +3,6 @@
use cosmwasm_schema::cw_serde;
use cosmwasm_std::Decimal;
use cosmwasm_std::OverflowError;
use cosmwasm_std::Uint128;
use serde::de::Error;
use serde::{Deserialize, Deserializer};
@@ -72,10 +71,6 @@ impl Percent {
// we know the cast from u128 to u8 is a safe one since the internal value must be within 0 - 1 range
truncate_decimal(hundred * self.0).u128() as u8
}
pub fn checked_pow(&self, exp: u32) -> Result<Self, OverflowError> {
self.0.checked_pow(exp).map(Percent)
}
}
impl Display for Percent {
+25 -5
View File
@@ -366,7 +366,11 @@ pub fn creating_proof_of_chunking_for_100_parties(c: &mut Criterion) {
.map(|&node_index| polynomial.evaluate_at(&Scalar::from(node_index)).into())
.collect::<Vec<_>>();
let remote_share_key_pairs = shares.iter().zip(receivers.values()).collect::<Vec<_>>();
let remote_share_key_pairs = shares
.iter()
.zip(receivers.values())
.map(|(share, key)| (share, key))
.collect::<Vec<_>>();
let ordered_public_keys = receivers.values().copied().collect::<Vec<_>>();
let (ciphertexts, hazmat) = encrypt_shares(&remote_share_key_pairs, &params, &mut rng);
@@ -396,7 +400,11 @@ pub fn verifying_proof_of_chunking_for_100_parties(c: &mut Criterion) {
.map(|&node_index| polynomial.evaluate_at(&Scalar::from(node_index)).into())
.collect::<Vec<_>>();
let remote_share_key_pairs = shares.iter().zip(receivers.values()).collect::<Vec<_>>();
let remote_share_key_pairs = shares
.iter()
.zip(receivers.values())
.map(|(share, key)| (share, key))
.collect::<Vec<_>>();
let ordered_public_keys = receivers.values().copied().collect::<Vec<_>>();
let (ciphertexts, hazmat) = encrypt_shares(&remote_share_key_pairs, &params, &mut rng);
@@ -428,7 +436,11 @@ pub fn creating_proof_of_secret_sharing_for_100_parties(c: &mut Criterion) {
.map(|&node_index| polynomial.evaluate_at(&Scalar::from(node_index)).into())
.collect::<Vec<_>>();
let remote_share_key_pairs = shares.iter().zip(receivers.values()).collect::<Vec<_>>();
let remote_share_key_pairs = shares
.iter()
.zip(receivers.values())
.map(|(share, key)| (share, key))
.collect::<Vec<_>>();
let (ciphertexts, hazmat) = encrypt_shares(&remote_share_key_pairs, &params, &mut rng);
@@ -466,7 +478,11 @@ pub fn verifying_proof_of_secret_sharing_for_100_parties(c: &mut Criterion) {
.map(|&node_index| polynomial.evaluate_at(&Scalar::from(node_index)).into())
.collect::<Vec<_>>();
let remote_share_key_pairs = shares.iter().zip(receivers.values()).collect::<Vec<_>>();
let remote_share_key_pairs = shares
.iter()
.zip(receivers.values())
.map(|(share, key)| (share, key))
.collect::<Vec<_>>();
let (ciphertexts, hazmat) = encrypt_shares(&remote_share_key_pairs, &params, &mut rng);
@@ -529,7 +545,11 @@ pub fn share_encryption_100(c: &mut Criterion) {
.map(|&node_index| polynomial.evaluate_at(&Scalar::from(node_index)).into())
.collect::<Vec<_>>();
let remote_share_key_pairs = shares.iter().zip(receivers.values()).collect::<Vec<_>>();
let remote_share_key_pairs = shares
.iter()
.zip(receivers.values())
.map(|(share, key)| (share, key))
.collect::<Vec<_>>();
c.bench_function("100 shares encryption", |b| {
b.iter(|| black_box(encrypt_shares(&remote_share_key_pairs, &params, &mut rng)))
-2
View File
@@ -14,6 +14,4 @@ license.workspace = true
bincode = "1.3.3"
bytes = "1.5.0"
nym-sphinx = { path = "../nymsphinx" }
rand = "0.8.5"
serde = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
+16 -349
View File
@@ -1,308 +1,35 @@
use std::net::IpAddr;
use nym_sphinx::addressing::clients::Recipient;
use serde::{Deserialize, Serialize};
pub const CURRENT_VERSION: u8 = 1;
fn generate_random() -> u64 {
use rand::RngCore;
let mut rng = rand::rngs::OsRng;
rng.next_u64()
#[derive(serde::Serialize, serde::Deserialize)]
pub struct TaggedIpPacket {
pub packet: bytes::Bytes,
pub return_address: nym_sphinx::addressing::clients::Recipient,
pub return_mix_hops: Option<u8>,
// pub return_mix_delays: Option<f64>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct IpPacketRequest {
pub version: u8,
pub data: IpPacketRequestData,
}
impl IpPacketRequest {
pub fn new_static_connect_request(
ip: IpAddr,
reply_to: Recipient,
reply_to_hops: Option<u8>,
reply_to_avg_mix_delays: Option<f64>,
) -> (Self, u64) {
let request_id = generate_random();
(
Self {
version: CURRENT_VERSION,
data: IpPacketRequestData::StaticConnect(StaticConnectRequest {
request_id,
ip,
reply_to,
reply_to_hops,
reply_to_avg_mix_delays,
}),
},
request_id,
)
}
pub fn new_dynamic_connect_request(
reply_to: Recipient,
reply_to_hops: Option<u8>,
reply_to_avg_mix_delays: Option<f64>,
) -> (Self, u64) {
let request_id = generate_random();
(
Self {
version: CURRENT_VERSION,
data: IpPacketRequestData::DynamicConnect(DynamicConnectRequest {
request_id,
reply_to,
reply_to_hops,
reply_to_avg_mix_delays,
}),
},
request_id,
)
}
pub fn new_ip_packet(ip_packet: bytes::Bytes) -> Self {
Self {
version: CURRENT_VERSION,
data: IpPacketRequestData::Data(DataRequest { ip_packet }),
impl TaggedIpPacket {
pub fn new(
packet: bytes::Bytes,
return_address: nym_sphinx::addressing::clients::Recipient,
return_mix_hops: Option<u8>,
) -> Self {
TaggedIpPacket {
packet,
return_address,
return_mix_hops,
}
}
pub fn id(&self) -> Option<u64> {
match &self.data {
IpPacketRequestData::StaticConnect(request) => Some(request.request_id),
IpPacketRequestData::DynamicConnect(request) => Some(request.request_id),
IpPacketRequestData::Data(_) => None,
}
}
pub fn recipient(&self) -> Option<&Recipient> {
match &self.data {
IpPacketRequestData::StaticConnect(request) => Some(&request.reply_to),
IpPacketRequestData::DynamicConnect(request) => Some(&request.reply_to),
IpPacketRequestData::Data(_) => None,
}
}
pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::Error> {
use bincode::Options;
make_bincode_serializer().serialize(self)
}
pub fn from_reconstructed_message(
message: &nym_sphinx::receiver::ReconstructedMessage,
) -> Result<Self, bincode::Error> {
use bincode::Options;
make_bincode_serializer().deserialize(&message.message)
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub enum IpPacketRequestData {
StaticConnect(StaticConnectRequest),
DynamicConnect(DynamicConnectRequest),
Data(DataRequest),
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct StaticConnectRequest {
pub request_id: u64,
pub ip: IpAddr,
pub reply_to: Recipient,
pub reply_to_hops: Option<u8>,
pub reply_to_avg_mix_delays: Option<f64>,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct DynamicConnectRequest {
pub request_id: u64,
pub reply_to: Recipient,
pub reply_to_hops: Option<u8>,
pub reply_to_avg_mix_delays: Option<f64>,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct DataRequest {
pub ip_packet: bytes::Bytes,
}
// ---
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct IpPacketResponse {
pub version: u8,
pub data: IpPacketResponseData,
}
impl IpPacketResponse {
pub fn new_static_connect_success(request_id: u64, reply_to: Recipient) -> Self {
Self {
version: CURRENT_VERSION,
data: IpPacketResponseData::StaticConnect(StaticConnectResponse {
request_id,
reply_to,
reply: StaticConnectResponseReply::Success,
}),
}
}
pub fn new_static_connect_failure(
request_id: u64,
reply_to: Recipient,
reason: StaticConnectFailureReason,
) -> Self {
Self {
version: CURRENT_VERSION,
data: IpPacketResponseData::StaticConnect(StaticConnectResponse {
request_id,
reply_to,
reply: StaticConnectResponseReply::Failure(reason),
}),
}
}
pub fn new_dynamic_connect_success(request_id: u64, reply_to: Recipient, ip: IpAddr) -> Self {
Self {
version: CURRENT_VERSION,
data: IpPacketResponseData::DynamicConnect(DynamicConnectResponse {
request_id,
reply_to,
reply: DynamicConnectResponseReply::Success(DynamicConnectSuccess { ip }),
}),
}
}
pub fn new_dynamic_connect_failure(
request_id: u64,
reply_to: Recipient,
reason: DynamicConnectFailureReason,
) -> Self {
Self {
version: CURRENT_VERSION,
data: IpPacketResponseData::DynamicConnect(DynamicConnectResponse {
request_id,
reply_to,
reply: DynamicConnectResponseReply::Failure(reason),
}),
}
}
pub fn new_ip_packet(ip_packet: bytes::Bytes) -> Self {
Self {
version: CURRENT_VERSION,
data: IpPacketResponseData::Data(DataResponse { ip_packet }),
}
}
pub fn id(&self) -> Option<u64> {
match &self.data {
IpPacketResponseData::StaticConnect(response) => Some(response.request_id),
IpPacketResponseData::DynamicConnect(response) => Some(response.request_id),
IpPacketResponseData::Data(_) => None,
}
}
pub fn recipient(&self) -> Option<&Recipient> {
match &self.data {
IpPacketResponseData::StaticConnect(response) => Some(&response.reply_to),
IpPacketResponseData::DynamicConnect(response) => Some(&response.reply_to),
IpPacketResponseData::Data(_) => None,
}
}
pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::Error> {
use bincode::Options;
make_bincode_serializer().serialize(self)
}
pub fn from_reconstructed_message(
message: &nym_sphinx::receiver::ReconstructedMessage,
) -> Result<Self, bincode::Error> {
use bincode::Options;
make_bincode_serializer().deserialize(&message.message)
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum IpPacketResponseData {
StaticConnect(StaticConnectResponse),
DynamicConnect(DynamicConnectResponse),
Data(DataResponse),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct StaticConnectResponse {
pub request_id: u64,
pub reply_to: Recipient,
pub reply: StaticConnectResponseReply,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum StaticConnectResponseReply {
Success,
Failure(StaticConnectFailureReason),
}
impl StaticConnectResponseReply {
pub fn is_success(&self) -> bool {
match self {
StaticConnectResponseReply::Success => true,
StaticConnectResponseReply::Failure(_) => false,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)]
pub enum StaticConnectFailureReason {
#[error("requested ip address is already in use")]
RequestedIpAlreadyInUse,
#[error("requested nym-address is already in use")]
RequestedNymAddressAlreadyInUse,
#[error("{0}")]
Other(String),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DynamicConnectResponse {
pub request_id: u64,
pub reply_to: Recipient,
pub reply: DynamicConnectResponseReply,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum DynamicConnectResponseReply {
Success(DynamicConnectSuccess),
Failure(DynamicConnectFailureReason),
}
impl DynamicConnectResponseReply {
pub fn is_success(&self) -> bool {
match self {
DynamicConnectResponseReply::Success(_) => true,
DynamicConnectResponseReply::Failure(_) => false,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DynamicConnectSuccess {
pub ip: IpAddr,
}
#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)]
pub enum DynamicConnectFailureReason {
#[error("requested nym-address is already in use")]
RequestedNymAddressAlreadyInUse,
#[error("no available ip address")]
NoAvailableIp,
#[error("{0}")]
Other(String),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DataResponse {
pub ip_packet: bytes::Bytes,
}
fn make_bincode_serializer() -> impl bincode::Options {
@@ -311,63 +38,3 @@ fn make_bincode_serializer() -> impl bincode::Options {
.with_big_endian()
.with_varint_encoding()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn check_size_of_request() {
let connect = IpPacketRequest {
version: 4,
data: IpPacketRequestData::StaticConnect(
StaticConnectRequest {
request_id: 123,
ip: IpAddr::from([10, 0, 0, 1]),
reply_to: Recipient::try_from_base58_string("D1rrpsysCGCYXy9saP8y3kmNpGtJZUXN9SvFoUcqAsM9.9Ssso1ea5NfkbMASdiseDSjTN1fSWda5SgEVjdSN4CvV@GJqd3ZxpXWSNxTfx7B1pPtswpetH4LnJdFeLeuY5KUuN").unwrap(),
reply_to_hops: None,
reply_to_avg_mix_delays: None,
},
)
};
assert_eq!(connect.to_bytes().unwrap().len(), 107);
}
#[test]
fn check_size_of_data() {
let data = IpPacketRequest {
version: 4,
data: IpPacketRequestData::Data(DataRequest {
ip_packet: bytes::Bytes::from(vec![1u8; 32]),
}),
};
assert_eq!(data.to_bytes().unwrap().len(), 35);
}
#[test]
fn serialize_and_deserialize_data_request() {
let data = IpPacketRequest {
version: 4,
data: IpPacketRequestData::Data(DataRequest {
ip_packet: bytes::Bytes::from(vec![1, 2, 4, 2, 5]),
}),
};
let serialized = data.to_bytes().unwrap();
let deserialized = IpPacketRequest::from_reconstructed_message(
&nym_sphinx::receiver::ReconstructedMessage {
message: serialized,
sender_tag: None,
},
)
.unwrap();
assert_eq!(deserialized.version, 4);
assert_eq!(
deserialized.data,
IpPacketRequestData::Data(DataRequest {
ip_packet: bytes::Bytes::from(vec![1, 2, 4, 2, 5]),
})
);
}
}
+5 -27
View File
@@ -6,10 +6,9 @@ use criterion::{criterion_group, criterion_main, Criterion};
use ff::Field;
use group::{Curve, Group};
use nym_coconut::{
aggregate_signature_shares, aggregate_verification_keys, blind_sign, prepare_blind_sign,
prove_bandwidth_credential, setup, ttp_keygen, verify_credential,
verify_partial_blind_signature, Attribute, BlindedSignature, Parameters, Signature,
SignatureShare, VerificationKey,
aggregate_signature_shares, aggregate_verification_keys, blind_sign, elgamal_keygen,
prepare_blind_sign, prove_bandwidth_credential, setup, ttp_keygen, verify_credential,
Attribute, BlindedSignature, Parameters, Signature, SignatureShare, VerificationKey,
};
use rand::seq::SliceRandom;
use std::ops::Neg;
@@ -176,6 +175,8 @@ fn bench_coconut(c: &mut Criterion) {
let binding_number = params.random_scalar();
let private_attributes = vec![serial_number, binding_number];
let _elgamal_keypair = elgamal_keygen(&params);
// The prepare blind sign is performed by the user
let (pedersen_commitments_openings, blind_sign_request) =
prepare_blind_sign(&params, &private_attributes, &public_attributes).unwrap();
@@ -241,29 +242,6 @@ fn bench_coconut(c: &mut Criterion) {
.map(|keypair| keypair.verification_key())
.collect();
// verify a random partial blind signature
let rand_idx = 1;
let random_blind_signature = blinded_signatures.get(rand_idx).unwrap();
let partial_verification_key = verification_keys.get(rand_idx).unwrap();
group.bench_function(
&format!(
"verify_partial_blind_signature_{}_private_attributes_{}_public_attributes",
case.num_private_attrs, case.num_public_attrs
),
|b| {
b.iter(|| {
verify_partial_blind_signature(
&params,
&blind_sign_request,
&public_attributes,
random_blind_signature,
partial_verification_key,
)
})
},
);
// Lets bench worse case, ie aggregating all
let indices: Vec<u64> = (1..=case.num_authorities).collect();
// aggregate verification keys
-1
View File
@@ -14,7 +14,6 @@ pub use scheme::aggregation::aggregate_signature_shares;
pub use scheme::aggregation::aggregate_verification_keys;
pub use scheme::issuance::blind_sign;
pub use scheme::issuance::prepare_blind_sign;
pub use scheme::issuance::verify_partial_blind_signature;
pub use scheme::issuance::BlindSignRequest;
pub use scheme::keygen::ttp_keygen;
pub use scheme::keygen::KeyPair;
+2 -172
View File
@@ -3,14 +3,12 @@
use std::convert::TryFrom;
use std::convert::TryInto;
use std::ops::Neg;
use bls12_381::{multi_miller_loop, G1Affine, G1Projective, G2Prepared, Scalar};
use group::{Curve, Group, GroupEncoding};
use bls12_381::{G1Affine, G1Projective, Scalar};
use group::{Curve, GroupEncoding};
use crate::error::{CoconutError, Result};
use crate::proofs::ProofCmCs;
use crate::scheme::keygen::VerificationKey;
use crate::scheme::setup::Parameters;
use crate::scheme::BlindedSignature;
use crate::scheme::SecretKey;
@@ -320,96 +318,6 @@ pub fn blind_sign(
Ok(BlindedSignature(h, sig))
}
/// Verifies a partial blind signature using the provided parameters and validator's verification key.
///
/// # Arguments
///
/// * `params` - A reference to the cryptographic parameters.
/// * `blind_sign_request` - A reference to the blind signature request signed by the client.
/// * `public_attributes` - A reference to the public attributes included in the client's request.
/// * `blind_sig` - A reference to the issued partial blinded signature to be verified.
/// * `partial_verification_key` - A reference to the validator's partial verification key.
///
/// # Returns
///
/// A boolean indicating whether the partial blind signature is valid (`true`) or not (`false`).
///
/// # Remarks
///
/// This function verifies the correctness and validity of a partial blind signature using
/// the provided cryptographic parameters, blind signature request, blinded signature,
/// and partial verification key.
/// It calculates pairings based on the provided values and checks whether the partial blind signature
/// is consistent with the verification key and commitments in the blind signature request.
/// The function returns `true` if the partial blind signature is valid, and `false` otherwise.
pub fn verify_partial_blind_signature(
params: &Parameters,
blind_sign_request: &BlindSignRequest,
public_attributes: &[Attribute],
blind_sig: &BlindedSignature,
partial_verification_key: &VerificationKey,
) -> bool {
let num_private_attributes = blind_sign_request.private_attributes_commitments.len();
if num_private_attributes + public_attributes.len() > partial_verification_key.beta_g2.len() {
return false;
}
// TODO: we're losing some memory here due to extra allocation,
// but worst-case scenario (given SANE amount of attributes), it's just few kb at most
let c_neg = blind_sig.1.to_affine().neg();
let g2_prep = params.prepared_miller_g2();
let mut terms = vec![
// (c^{-1}, g2)
(c_neg, g2_prep.clone()),
// (s, alpha)
(
blind_sig.0.to_affine(),
G2Prepared::from(partial_verification_key.alpha.to_affine()),
),
];
// for each private attribute, add (cm_i, beta_i) to the miller terms
for (private_attr_commit, beta_g2) in blind_sign_request
.private_attributes_commitments
.iter()
.zip(&partial_verification_key.beta_g2)
{
// (cm_i, beta_i)
terms.push((
private_attr_commit.to_affine(),
G2Prepared::from(beta_g2.to_affine()),
))
}
// for each public attribute, add (s^pub_j, beta_{priv + j}) to the miller terms
for (pub_attr, beta_g2) in public_attributes.iter().zip(
partial_verification_key
.beta_g2
.iter()
.skip(num_private_attributes),
) {
// (s^pub_j, beta_j)
terms.push((
(blind_sig.0 * pub_attr).to_affine(),
G2Prepared::from(beta_g2.to_affine()),
))
}
// get the references to all the terms to get the arguments the miller loop expects
let terms_refs = terms.iter().map(|(g1, g2)| (g1, g2)).collect::<Vec<_>>();
// since checking whether e(a, b) == e(c, d)
// is equivalent to checking e(a, b) • e(c, d)^{-1} == id
// and thus to e(a, b) • e(c^{-1}, d) == id
//
// compute e(c^1, g2) • e(s, alpha) • e(cm_0, beta_0) • e(cm_i, beta_i) • (s^pub_0, beta_{i+1}) (s^pub_j, beta_{i + j})
multi_miller_loop(&terms_refs)
.final_exponentiation()
.is_identity()
.into()
}
#[cfg(test)]
pub fn sign(
params: &mut Parameters,
@@ -446,7 +354,6 @@ pub fn sign(
#[cfg(test)]
mod tests {
use super::*;
use crate::scheme::keygen::keygen;
#[test]
fn blind_sign_request_bytes_roundtrip() {
@@ -478,81 +385,4 @@ mod tests {
lambda
);
}
#[test]
fn successful_verify_partial_blind_signature() {
let params = Parameters::new(4).unwrap();
let private_attributes = params.n_random_scalars(2);
let public_attributes = params.n_random_scalars(2);
let (_commitments_openings, request) =
prepare_blind_sign(&params, &private_attributes, &public_attributes).unwrap();
let validator_keypair = keygen(&params);
let blind_sig = blind_sign(
&params,
&validator_keypair.secret_key(),
&request,
&public_attributes,
)
.unwrap();
assert!(verify_partial_blind_signature(
&params,
&request,
&public_attributes,
&blind_sig,
&validator_keypair.verification_key()
));
}
#[test]
fn successful_verify_partial_blind_signature_no_public_attributes() {
let params = Parameters::new(4).unwrap();
let private_attributes = params.n_random_scalars(2);
let (_commitments_openings, request) =
prepare_blind_sign(&params, &private_attributes, &[]).unwrap();
let validator_keypair = keygen(&params);
let blind_sig =
blind_sign(&params, &validator_keypair.secret_key(), &request, &[]).unwrap();
assert!(verify_partial_blind_signature(
&params,
&request,
&[],
&blind_sig,
&validator_keypair.verification_key()
));
}
#[test]
fn fail_verify_partial_blind_signature_with_wrong_key() {
let params = Parameters::new(4).unwrap();
let private_attributes = params.n_random_scalars(2);
let public_attributes = params.n_random_scalars(2);
let (_commitments_openings, request) =
prepare_blind_sign(&params, &private_attributes, &public_attributes).unwrap();
let validator_keypair = keygen(&params);
let validator2_keypair = keygen(&params);
let blind_sig = blind_sign(
&params,
&validator_keypair.secret_key(),
&request,
&public_attributes,
)
.unwrap();
// this assertion should fail, as we try to verify with a wrong validator key
assert!(!verify_partial_blind_signature(
&params,
&request,
&public_attributes,
&blind_sig,
&validator2_keypair.verification_key()
),);
}
}
+16 -53
View File
@@ -180,10 +180,10 @@ impl TunDevice {
);
// TODO: expire old entries
// #[allow(irrefutable_let_patterns)]
// if let RoutingMode::Nat(nat_table) = &mut self.routing_mode {
// nat_table.nat_table.insert(src_addr, tag);
// }
#[allow(irrefutable_let_patterns)]
if let RoutingMode::Nat(nat_table) = &mut self.routing_mode {
nat_table.nat_table.insert(src_addr, tag);
}
timeout(
Duration::from_millis(TUN_WRITE_TIMEOUT_MS),
@@ -196,13 +196,11 @@ impl TunDevice {
// Receive reponse packets from the wild internet
async fn handle_tun_read(&self, packet: &[u8]) -> Result<(), TunDeviceError> {
// let ParsedAddresses { src_addr, dst_addr } = parse_src_dst_address(packet)?;
// log::debug!(
// "iface: read Packet({dst_addr} <- {src_addr}, {} bytes)",
// packet.len(),
// );
let dst_addr =
parse_dst_addr(packet).ok_or(TunDeviceError::UnableToParseAddressIpHeaderMissing)?;
let ParsedAddresses { src_addr, dst_addr } = parse_src_dst_address(packet)?;
log::debug!(
"iface: read Packet({src_addr} -> {dst_addr}, {} bytes)",
packet.len(),
);
// Route packet to the correct peer.
@@ -222,14 +220,13 @@ impl TunDevice {
// But we can also do it by consulting the NAT table.
RoutingMode::Nat(ref nat_table) => {
// if let Some(tag) = nat_table.nat_table.get(&dst_addr) {
// log::debug!("Forward packet with NAT tag: {tag}");
return self
.tun_task_response_tx
// .try_send((*tag, packet.to_vec()))
.try_send((0, packet.to_vec()))
.map_err(|err| err.into());
// }
if let Some(tag) = nat_table.nat_table.get(&dst_addr) {
log::debug!("Forward packet with NAT tag: {tag}");
return self
.tun_task_response_tx
.try_send((*tag, packet.to_vec()))
.map_err(|err| err.into());
}
}
}
@@ -290,37 +287,3 @@ fn parse_src_dst_address(packet: &[u8]) -> Result<ParsedAddresses, TunDeviceErro
None => Err(TunDeviceError::UnableToParseAddressIpHeaderMissing),
}
}
// Copyright (c) 2019 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause
const IPV4_MIN_HEADER_SIZE: usize = 20;
const IPV4_DST_IP_OFF: usize = 16;
const IPV4_IP_SZ: usize = 4;
const IPV6_MIN_HEADER_SIZE: usize = 40;
const IPV6_DST_IP_OFF: usize = 24;
const IPV6_IP_SZ: usize = 16;
pub fn parse_dst_addr(packet: &[u8]) -> Option<IpAddr> {
if packet.is_empty() {
return None;
}
match packet[0] >> 4 {
4 if packet.len() >= IPV4_MIN_HEADER_SIZE => {
let addr_bytes: [u8; IPV4_IP_SZ] = packet
[IPV4_DST_IP_OFF..IPV4_DST_IP_OFF + IPV4_IP_SZ]
.try_into()
.unwrap();
Some(IpAddr::from(addr_bytes))
}
6 if packet.len() >= IPV6_MIN_HEADER_SIZE => {
let addr_bytes: [u8; IPV6_IP_SZ] = packet
[IPV6_DST_IP_OFF..IPV6_DST_IP_OFF + IPV6_IP_SZ]
.try_into()
.unwrap();
Some(IpAddr::from(addr_bytes))
}
_ => None,
}
}
+1
View File
@@ -20,6 +20,7 @@ bincode = "1.3.3"
#boringtun = "0.6.0"
boringtun = { workspace = true }
bytes = "1.5.0"
defguard_wireguard_rs = { path = "../../../wireguard-rs" }
dashmap = "5.5.3"
etherparse = "0.13.0"
futures = "0.3.28"
+26 -66
View File
@@ -17,74 +17,34 @@ use std::sync::Arc;
#[cfg(target_os = "linux")]
use nym_tun::tun_device;
use defguard_wireguard_rs::{host::Peer, InterfaceConfiguration, WGApi, WireguardInterfaceApi};
use nym_network_defaults::{WG_PORT, WG_TUN_DEVICE_ADDRESS};
use nym_tun::tun_task_channel;
use setup::PRIVATE_KEY;
/// Start wireguard UDP listener and TUN device
///
/// # Errors
///
/// This function will return an error if either the UDP listener of the TUN device fails to start.
#[cfg(target_os = "linux")]
/// Start wireguard device
pub async fn start_wireguard(
task_client: nym_task::TaskClient,
gateway_client_registry: Arc<GatewayClientRegistry>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
// TODO: make this configurable
// We can optionally index peers by their IP like standard wireguard. If we don't then we do
// plain NAT where we match incoming destination IP with outgoing source IP.
use nym_wireguard_types::tun_common::network_table::NetworkTable;
let peers_by_ip = Arc::new(tokio::sync::Mutex::new(NetworkTable::new()));
// Alternative 1:
let routing_mode = tun_device::RoutingMode::new_allowed_ips(peers_by_ip.clone());
// Alternative 2:
//let routing_mode = tun_device::RoutingMode::new_nat();
// Start the tun device that is used to relay traffic outbound
let config = tun_device::TunDeviceConfig {
base_name: nym_network_defaults::WG_TUN_BASE_NAME.to_string(),
ip: nym_network_defaults::WG_TUN_DEVICE_ADDRESS.parse().unwrap(),
netmask: nym_network_defaults::WG_TUN_DEVICE_NETMASK.parse().unwrap(),
};
let (tun, tun_task_tx, tun_task_response_rx) = tun_device::TunDevice::new(routing_mode, config);
tun.start();
// We also index peers by a tag
let peers_by_tag = Arc::new(tokio::sync::Mutex::new(wg_tunnel::PeersByTag::new()));
// If we want to have the tun device on a separate host, it's the tun_task and
// tun_task_response channels that needs to be sent over the network to the host where the tun
// device is running.
// The packet relayer's responsibility is to route packets between the correct tunnel and the
// tun device. The tun device may or may not be on a separate host, which is why we can't do
// this routing in the tun device itself.
let (packet_relayer, packet_tx) = packet_relayer::PacketRelayer::new(
tun_task_tx.clone(),
tun_task_response_rx,
peers_by_tag.clone(),
);
packet_relayer.start();
// Start the UDP listener that clients connect to
let udp_listener = udp_listener::WgUdpListener::new(
packet_tx,
peers_by_ip,
peers_by_tag,
Arc::clone(&gateway_client_registry),
)
.await?;
udp_listener.start(task_client);
Ok(())
}
#[cfg(not(target_os = "linux"))]
pub async fn start_wireguard(
_task_client: nym_task::TaskClient,
mut task_client: nym_task::TaskClient,
_gateway_client_registry: Arc<GatewayClientRegistry>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
todo!("WireGuard is currently only supported on Linux")
) -> Result<WGApi, Box<dyn std::error::Error + Send + Sync + 'static>> {
let ifname = String::from("wg0");
let wgapi = WGApi::new(ifname.clone(), false)?;
wgapi.create_interface()?;
let interface_config = InterfaceConfiguration {
name: ifname.clone(),
prvkey: PRIVATE_KEY.to_string(),
address: WG_TUN_DEVICE_ADDRESS.to_string(),
port: WG_PORT as u32,
peers: vec![],
};
wgapi.configure_interface(&interface_config)?;
let peer = std::env::var("NYM_PEER_PUBLIC_KEY").expect("NYM_PEER_PUBLIC_KEY must be set");
let mut peer = Peer::new(peer.as_str().try_into().unwrap());
peer.set_allowed_ips(vec!["10.1.0.2".parse().unwrap()]);
wgapi.configure_peer(&peer)?;
wgapi.configure_peer_routing(&vec![peer.clone()])?;
tokio::spawn(async move { task_client.recv().await });
Ok(wgapi)
}
+1 -1
View File
@@ -9,7 +9,7 @@ pub const WG_ADDRESS: &str = "0.0.0.0";
// The private key of the listener
// Corresponding public key: "WM8s8bYegwMa0TJ+xIwhk+dImk2IpDUKslDBCZPizlE="
const PRIVATE_KEY: &str = "AEqXrLFT4qjYq3wmX0456iv94uM6nDj5ugp6Jedcflg=";
pub(crate) const PRIVATE_KEY: &str = "AEqXrLFT4qjYq3wmX0456iv94uM6nDj5ugp6Jedcflg=";
// The AllowedIPs for the connected peer, which is one a single IP and the same as the IP that the
// peer has configured on their side.
+4 -2
View File
@@ -77,9 +77,11 @@ nym-statistics-common = { path = "../common/statistics" }
nym-task = { path = "../common/task" }
nym-types = { path = "../common/types" }
nym-validator-client = { path = "../common/client-libs/validator-client" }
nym-wireguard = { path = "../common/wireguard", optional = true }
nym-ip-packet-router = { path = "../service-providers/ip-packet-router" }
nym-wireguard = { path = "../common/wireguard", optional = true }
defguard_wireguard_rs = { path = "../../wireguard-rs", optional = true }
[dev-dependencies]
tower = "0.4.13"
rand = "0.8.5"
@@ -95,4 +97,4 @@ sqlx = { version = "0.5", features = [
] }
[features]
wireguard = ["nym-wireguard"]
wireguard = ["nym-wireguard", "defguard_wireguard_rs"]
+13 -18
View File
@@ -22,6 +22,7 @@ use crate::node::statistics::collector::GatewayStatisticsCollector;
use crate::node::storage::Storage;
use anyhow::bail;
use dashmap::DashMap;
use defguard_wireguard_rs::{WGApi, WireguardInterfaceApi};
use futures::channel::{mpsc, oneshot};
use log::*;
use nym_crypto::asymmetric::{encryption, identity};
@@ -200,11 +201,10 @@ impl<St> Gateway<St> {
mixnet_handling::Listener::new(listening_address, shutdown).start(connection_handler);
}
#[cfg(feature = "wireguard")]
async fn start_wireguard(
&self,
shutdown: TaskClient,
) -> Result<(), Box<dyn Error + Send + Sync>> {
) -> Result<WGApi, Box<dyn Error + Send + Sync>> {
// TODO: possibly we should start the UDP listener and TUN device explicitly here
nym_wireguard::start_wireguard(shutdown, Arc::clone(&self.client_registry)).await
}
@@ -476,13 +476,6 @@ impl<St> Gateway<St> {
});
}
self.start_client_websocket_listener(
mix_forwarding_channel.clone(),
active_clients_store.clone(),
shutdown.subscribe().named("websocket::Listener"),
Arc::new(coconut_verifier),
);
let nr_request_filter = if self.config.network_requester.enabled {
let embedded_nr = self
.start_network_requester(
@@ -503,7 +496,7 @@ impl<St> Gateway<St> {
if self.config.ip_packet_router.enabled {
let embedded_ip_sp = self
.start_ip_packet_router(
mix_forwarding_channel,
mix_forwarding_channel.clone(),
shutdown.subscribe().named("ip_service_provider"),
)
.await?;
@@ -520,16 +513,17 @@ impl<St> Gateway<St> {
.with_maybe_network_request_filter(nr_request_filter)
.start(shutdown.subscribe().named("http-api"))?;
// Once this is a bit more mature, make this a commandline flag instead of a compile time
// flag
#[cfg(feature = "wireguard")]
if let Err(err) = self
self.start_client_websocket_listener(
mix_forwarding_channel,
active_clients_store,
shutdown.subscribe().named("websocket::Listener"),
Arc::new(coconut_verifier),
);
let wg_api = self
.start_wireguard(shutdown.subscribe().named("wireguard"))
.await
{
// that's a nasty workaround, but anyhow errors are generally nicer, especially on exit
bail!("{err}")
}
.expect("Could not start wireguard");
info!("Finished nym gateway startup procedure - it should now be able to receive mix and client traffic!");
@@ -537,6 +531,7 @@ impl<St> Gateway<St> {
// that's a nasty workaround, but anyhow errors are generally nicer, especially on exit
bail!("{err}")
}
wg_api.remove_interface()?;
Ok(())
}
}
@@ -24,15 +24,7 @@ struct MixnodeWithStakeAndPerformance {
impl MixnodeWithStakeAndPerformance {
fn to_selection_weight(&self) -> f64 {
let scaled_performance = match self.performance.checked_pow(20) {
Ok(perf) => perf,
Err(overflow) => {
warn!("the node's performance ({}) has overflow while scaling it by the factor of 20: {overflow}. Setting it to 0 instead.", self.performance);
return 0.;
}
};
let scaled_stake = self.total_stake * scaled_performance;
let scaled_stake = self.total_stake * self.performance;
stake_to_f64(scaled_stake)
}
}
@@ -168,7 +168,6 @@ impl MixnetClient {
}
}
#[derive(Clone)]
pub struct MixnetClientSender {
client_input: ClientInput,
packet_type: Option<PacketType>,
@@ -11,7 +11,6 @@ license.workspace = true
[dependencies]
bincode = "1.3.3"
bytes = "1.5.0"
dashmap.workspace = true
etherparse = "0.13.0"
futures = { workspace = true }
log = { workspace = true }
@@ -28,7 +27,6 @@ nym-task = { path = "../../common/task" }
nym-tun = { path = "../../common/tun" }
nym-wireguard = { path = "../../common/wireguard" }
nym-wireguard-types = { path = "../../common/wireguard-types" }
rand = "0.8.5"
reqwest.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
@@ -30,9 +30,6 @@ pub enum IpPacketRouterError {
#[error("the entity wrapping the network requester has disconnected")]
DisconnectedParent,
#[error("received packet has an invalid version: {0}")]
InvalidPacketVersion(u8),
#[error("failed to deserialize tagged packet: {source}")]
FailedToDeserializeTaggedPacket { source: bincode::Error },
@@ -1,38 +0,0 @@
use std::{
collections::HashMap,
net::{IpAddr, Ipv4Addr},
};
use crate::{ConnectedClient, TUN_DEVICE_ADDRESS};
// Find an available IP address in self.connected_clients
// TODO: make this nicer
fn generate_random_ip_within_subnet() -> Ipv4Addr {
let mut rng = rand::thread_rng();
// Generate a random number in the range 1-254
let last_octet = rand::Rng::gen_range(&mut rng, 1..=254);
Ipv4Addr::new(10, 0, 0, last_octet)
}
fn is_ip_taken(
connected_clients: &HashMap<IpAddr, ConnectedClient>,
tun_ip: Ipv4Addr,
ip: Ipv4Addr,
) -> bool {
connected_clients.contains_key(&ip.into()) || ip == tun_ip
}
// TODO: brute force approach. We could consider using a more efficient algorithm
pub(crate) fn find_new_ip(connected_clients: &HashMap<IpAddr, ConnectedClient>) -> Option<IpAddr> {
let mut new_ip = generate_random_ip_within_subnet();
let mut tries = 0;
let tun_ip = TUN_DEVICE_ADDRESS.parse::<Ipv4Addr>().unwrap();
while is_ip_taken(connected_clients, tun_ip, new_ip) {
new_ip = generate_random_ip_within_subnet();
tries += 1;
if tries > 100 {
return None;
}
}
Some(new_ip.into())
}
+66 -393
View File
@@ -1,11 +1,8 @@
#![cfg_attr(not(target_os = "linux"), allow(dead_code))]
use std::{
collections::HashMap,
net::{IpAddr, SocketAddr},
path::Path,
sync::Arc,
time::Duration,
};
use error::IpPacketRouterError;
@@ -14,10 +11,6 @@ use nym_client_core::{
client::mix_traffic::transceiver::GatewayTransceiver,
config::disk_persistence::CommonClientPaths, HardcodedTopologyProvider, TopologyProvider,
};
use nym_ip_packet_requests::{
DynamicConnectFailureReason, IpPacketRequest, IpPacketRequestData, IpPacketResponse,
StaticConnectFailureReason,
};
use nym_sdk::{
mixnet::{InputMessage, MixnetMessageSender, Recipient},
NymNetworkDetails,
@@ -32,7 +25,6 @@ pub use crate::config::Config;
pub mod config;
pub mod error;
mod generate_new_ip;
mod request_filter;
// The interface used to route traffic
@@ -40,9 +32,6 @@ pub const TUN_BASE_NAME: &str = "nymtun";
pub const TUN_DEVICE_ADDRESS: &str = "10.0.0.1";
pub const TUN_DEVICE_NETMASK: &str = "255.255.255.0";
const DISCONNECT_TIMER_INTERVAL: Duration = Duration::from_secs(10);
const CLIENT_INACTIVITY_TIMEOUT: Duration = Duration::from_secs(5 * 60);
pub struct OnStartData {
// to add more fields as required
pub address: Recipient,
@@ -168,10 +157,9 @@ impl IpPacketRouterBuilder {
_config: self.config,
request_filter: request_filter.clone(),
tun_task_tx,
tun_task_response_rx: Some(tun_task_response_rx),
tun_task_response_rx,
mixnet_client,
task_handle,
connected_clients: Default::default(),
};
log::info!("The address of this client is: {self_address}");
@@ -191,173 +179,85 @@ impl IpPacketRouterBuilder {
}
}
#[cfg_attr(not(target_os = "linux"), allow(unused))]
#[allow(unused)]
struct IpPacketRouter {
_config: Config,
request_filter: request_filter::RequestFilter,
tun_task_tx: nym_tun::tun_task_channel::TunTaskTx,
tun_task_response_rx: Option<nym_tun::tun_task_channel::TunTaskResponseRx>,
tun_task_response_rx: nym_tun::tun_task_channel::TunTaskResponseRx,
mixnet_client: nym_sdk::mixnet::MixnetClient,
task_handle: TaskHandle,
connected_clients: HashMap<IpAddr, ConnectedClient>,
}
struct ConnectedClient {
nym_address: Recipient,
last_activity: std::time::Instant,
}
#[cfg_attr(not(target_os = "linux"), allow(unused))]
#[allow(unused)]
impl IpPacketRouter {
async fn on_static_connect_request(
&mut self,
connect_request: nym_ip_packet_requests::StaticConnectRequest,
connected_client_tx: &tokio::sync::mpsc::Sender<ConnectedClientEvent>,
) -> Result<Option<IpPacketResponse>, IpPacketRouterError> {
log::info!(
"Received static connect request from {sender_address}",
sender_address = connect_request.reply_to
);
async fn run(mut self) -> Result<(), IpPacketRouterError> {
let mut task_client = self.task_handle.fork("main_loop");
let request_id = connect_request.request_id;
let requested_ip = connect_request.ip;
let reply_to = connect_request.reply_to;
// TODO: ignoring reply_to_hops and reply_to_avg_mix_delays for now
while !task_client.is_shutdown() {
tokio::select! {
_ = task_client.recv() => {
log::debug!("IpPacketRouter [main loop]: received shutdown");
},
msg = self.mixnet_client.next() => {
if let Some(msg) = msg {
if let Err(err) = self.on_message(msg).await {
log::error!("Error handling mixnet message: {err}");
};
} else {
log::trace!("IpPacketRouter [main loop]: stopping since channel closed");
break;
};
},
packet = self.tun_task_response_rx.recv() => {
if let Some((_tag, packet)) = packet {
// Read recipient from env variable NYM_CLIENT_ADDR which is a base58
// string of the nym-address of the client that the packet should be
// sent back to.
//
// In the near future we will let the client expose it's nym-address
// directly, and after that, provide SURBS
let recipient = std::env::var("NYM_CLIENT_ADDR").ok().and_then(|addr| {
Recipient::try_from_base58_string(addr).ok()
});
// Check that the IP is available in the set of connected clients
let is_ip_taken = self.connected_clients.contains_key(&requested_ip);
// let is_ip_taken = connected_clients
// .lock()
// .unwrap()
// .contains_key(&requested_ip);
if let Some(recipient) = recipient {
let lane = TransmissionLane::General;
let packet_type = None;
let input_message = InputMessage::new_regular(recipient, packet, lane, packet_type);
// Check that the nym address isn't already registered
let is_nym_address_taken = self
.connected_clients
// .lock()
// .unwrap()
.values()
.any(|client| client.nym_address == reply_to);
match (is_ip_taken, is_nym_address_taken) {
(true, true) => {
log::info!("Connecting an already connected client");
// Update the last activity time for the client
if let Some(client) = self.connected_clients.get_mut(&requested_ip) {
client.last_activity = std::time::Instant::now();
} else {
log::error!("Failed to update last activity time for client");
if let Err(err) = self.mixnet_client.send(input_message).await {
log::error!("IpPacketRouter [main loop]: failed to send packet to mixnet: {err}");
};
} else {
log::error!("NYM_CLIENT_ADDR not set or invalid");
}
} else {
log::trace!("IpPacketRouter [main loop]: stopping since channel closed");
break;
}
}
Ok(Some(IpPacketResponse::new_static_connect_success(
request_id, reply_to,
)))
}
(false, false) => {
log::info!("Connecting a new client");
self.connected_clients.insert(
requested_ip,
ConnectedClient {
nym_address: reply_to,
last_activity: std::time::Instant::now(),
},
);
connected_client_tx
.send(ConnectedClientEvent::Connect(requested_ip, reply_to))
.await
.unwrap();
Ok(Some(IpPacketResponse::new_static_connect_success(
request_id, reply_to,
)))
}
(true, false) => {
log::info!("Requested IP is not available");
Ok(Some(IpPacketResponse::new_static_connect_failure(
request_id,
reply_to,
StaticConnectFailureReason::RequestedIpAlreadyInUse,
)))
}
(false, true) => {
log::info!("Nym address is already registered");
Ok(Some(IpPacketResponse::new_static_connect_failure(
request_id,
reply_to,
StaticConnectFailureReason::RequestedNymAddressAlreadyInUse,
)))
}
}
log::info!("IpPacketRouter: stopping");
Ok(())
}
async fn on_dynamic_connect_request(
async fn on_message(
&mut self,
connect_request: nym_ip_packet_requests::DynamicConnectRequest,
connected_client_tx: &tokio::sync::mpsc::Sender<ConnectedClientEvent>,
) -> Result<Option<IpPacketResponse>, IpPacketRouterError> {
log::info!(
"Received dynamic connect request from {sender_address}",
sender_address = connect_request.reply_to
reconstructed: ReconstructedMessage,
) -> Result<(), IpPacketRouterError> {
log::debug!(
"Received message with sender_tag: {:?}",
reconstructed.sender_tag
);
let request_id = connect_request.request_id;
let reply_to = connect_request.reply_to;
// TODO: ignoring reply_to_hops and reply_to_avg_mix_delays for now
// Check if it's the same client connecting again, then we just reuse the same IP
// TODO: this is problematic. Until we sign connect requests this means you can spam people
// with return traffic
let existing_ip = self.connected_clients.iter().find_map(|(ip, client)| {
if client.nym_address == reply_to {
Some(*ip)
} else {
None
}
});
if let Some(existing_ip) = existing_ip {
log::info!("Found existing client for nym address");
// Update the last activity time for the client
if let Some(client) = self.connected_clients.get_mut(&existing_ip) {
client.last_activity = std::time::Instant::now();
} else {
log::error!("Failed to update last activity time for client");
}
return Ok(Some(IpPacketResponse::new_dynamic_connect_success(
request_id,
reply_to,
existing_ip,
)));
}
let Some(new_ip) = generate_new_ip::find_new_ip(&self.connected_clients) else {
log::info!("No available IP address");
return Ok(Some(IpPacketResponse::new_dynamic_connect_failure(
request_id,
reply_to,
DynamicConnectFailureReason::NoAvailableIp,
)));
};
self.connected_clients.insert(
new_ip,
ConnectedClient {
nym_address: reply_to,
last_activity: std::time::Instant::now(),
},
);
connected_client_tx
.send(ConnectedClientEvent::Connect(new_ip, reply_to))
.await
.unwrap();
Ok(Some(IpPacketResponse::new_dynamic_connect_success(
request_id, reply_to, new_ip,
)))
}
async fn on_data_request(
&mut self,
data_request: nym_ip_packet_requests::DataRequest,
) -> Result<Option<IpPacketResponse>, IpPacketRouterError> {
log::info!("Received data request");
let tagged_packet =
nym_ip_packet_requests::TaggedIpPacket::from_reconstructed_message(&reconstructed)
.map_err(|err| IpPacketRouterError::FailedToDeserializeTaggedPacket {
source: err,
})?;
// We don't forward packets that we are not able to parse. BUT, there might be a good
// reason to still forward them.
@@ -371,20 +271,11 @@ impl IpPacketRouter {
src_addr,
dst_addr,
dst,
} = parse_packet(&data_request.ip_packet)?;
} = parse_packet(&tagged_packet.packet)?;
let dst_str = dst.map_or(dst_addr.to_string(), |dst| dst.to_string());
log::info!("Received packet: {packet_type}: {src_addr} -> {dst_str}");
// Check if there is a connected client for this src_addr. If there is, update the last activity time
// for the client. If there isn't, drop the packet.
if let Some(client) = self.connected_clients.get_mut(&src_addr) {
client.last_activity = std::time::Instant::now();
} else {
log::info!("Dropping packet: no connected client for {src_addr}");
return Ok(None);
}
// Filter check
if let Some(dst) = dst {
if !self.request_filter.check_address(&dst).await {
@@ -397,200 +288,16 @@ impl IpPacketRouter {
log::warn!("Ignoring filter check for packet without port number! TODO!");
}
// TODO: consider changing from Vec<u8> to bytes::Bytes?
// TODO: consider just removing the tag
let tag = 0;
// TODO: set the tag correctly. Can we just reuse sender_tag?
let peer_tag = 0;
self.tun_task_tx
.try_send((tag, data_request.ip_packet.into()))
.try_send((peer_tag, tagged_packet.packet.into()))
.map_err(|err| IpPacketRouterError::FailedToSendPacketToTun { source: err })?;
Ok(None)
}
async fn on_reconstructed_message(
&mut self,
reconstructed: ReconstructedMessage,
connected_client_tx: &tokio::sync::mpsc::Sender<ConnectedClientEvent>,
) -> Result<Option<IpPacketResponse>, IpPacketRouterError> {
log::debug!(
"Received message with sender_tag: {:?}",
reconstructed.sender_tag
);
// Check version of request
if let Some(version) = reconstructed.message.first() {
// The idea is that in the future we can add logic here to parse older versions to stay
// backwards compatible.
if *version != nym_ip_packet_requests::CURRENT_VERSION {
log::warn!("Received packet with invalid version");
return Err(IpPacketRouterError::InvalidPacketVersion(*version));
}
}
let request = IpPacketRequest::from_reconstructed_message(&reconstructed)
.map_err(|err| IpPacketRouterError::FailedToDeserializeTaggedPacket { source: err })?;
match request.data {
IpPacketRequestData::StaticConnect(connect_request) => {
self.on_static_connect_request(connect_request, connected_client_tx)
.await
}
IpPacketRequestData::DynamicConnect(connect_request) => {
self.on_dynamic_connect_request(connect_request, connected_client_tx)
.await
}
IpPacketRequestData::Data(data_request) => self.on_data_request(data_request).await,
}
}
async fn run(mut self) -> Result<(), IpPacketRouterError> {
let mut task_client = self.task_handle.fork("main_loop");
let mut disconnect_timer = tokio::time::interval(DISCONNECT_TIMER_INTERVAL);
let mixnet_client_sender = self.mixnet_client.split_sender();
let mixnet_client_sender_clone = mixnet_client_sender.clone();
// let connected_clients = Arc::new(std::sync::Mutex::new(
// HashMap::<IpAddr, ConnectedClient>::new(),
// ));
// let connected_clients_clone = connected_clients.clone();
let tun_task_response_rx = self.tun_task_response_rx.take();
let (connected_client_tx, mut connected_client_rx) = tokio::sync::mpsc::channel(16);
// Spawn TUN listener
tokio::spawn(async move {
let mut connected_clients = HashMap::<IpAddr, ConnectedClient>::new();
let mut tun_task_response_rx = tun_task_response_rx.unwrap();
loop {
tokio::select! {
connected_client_event = connected_client_rx.recv() => {
match connected_client_event {
Some(ConnectedClientEvent::Connect(ip, nym_addr)) => {
log::trace!("Connect client: {ip}");
connected_clients.insert(ip, ConnectedClient {
nym_address: nym_addr,
last_activity: std::time::Instant::now(),
});
},
Some(ConnectedClientEvent::Disconnect(ip)) => {
log::trace!("Disconnect client: {ip}");
connected_clients.remove(&ip);
},
None => {},
}
},
packet = tun_task_response_rx.recv() => {
if let Some((_tag, packet)) = packet {
// TODO: skip full parsing since we only need dst_addr
// let Ok(ParsedPacket {
// packet_type: _,
// src_addr: _,
// dst_addr,
// dst: _,
// }) = parse_packet(&packet) else {
// log::warn!("Failed to parse packet");
// continue;
// };
let Some(dst_addr) = parse_dst_addr(&packet) else {
log::warn!("Failed to parse packet");
continue;
};
let recipient = connected_clients.get(&dst_addr).map(|c| c.nym_address);
if let Some(recipient) = recipient {
let lane = TransmissionLane::General;
let packet_type = None;
let response_packet = IpPacketResponse::new_ip_packet(packet.into()).to_bytes();
let Ok(response_packet) = response_packet else {
log::error!("Failed to serialize response packet");
continue;
};
let input_message = InputMessage::new_regular(recipient, response_packet, lane, packet_type);
if let Err(err) = mixnet_client_sender_clone.send(input_message).await {
log::error!("IpPacketRouter [main loop]: failed to send packet to mixnet: {err}");
};
} else {
log::error!("IpPacketRouter [main loop]: no nym-address recipient for packet");
}
} else {
log::trace!("IpPacketRouter [main loop]: stopping since channel closed");
break;
}
}
}
}
});
while !task_client.is_shutdown() {
tokio::select! {
_ = task_client.recv() => {
log::debug!("IpPacketRouter [main loop]: received shutdown");
},
_ = disconnect_timer.tick() => {
let now = std::time::Instant::now();
let inactive_clients: Vec<IpAddr> = self.connected_clients.iter()
.filter_map(|(ip, client)| {
if now.duration_since(client.last_activity) > CLIENT_INACTIVITY_TIMEOUT {
Some(*ip)
} else {
None
}
})
.collect();
for ip in inactive_clients {
log::info!("Disconnect inactive client: {ip}");
self.connected_clients.remove(&ip);
connected_client_tx.send(ConnectedClientEvent::Disconnect(ip)).await.unwrap();
}
},
msg = self.mixnet_client.next() => {
if let Some(msg) = msg {
match self.on_reconstructed_message(msg, &connected_client_tx).await {
Ok(Some(response)) => {
let Some(recipient) = response.recipient() else {
log::error!("IpPacketRouter [main loop]: failed to get recipient from response");
continue;
};
let response_packet = response.to_bytes();
let Ok(response_packet) = response_packet else {
log::error!("Failed to serialize response packet");
continue;
};
let lane = TransmissionLane::General;
let packet_type = None;
let input_message = InputMessage::new_regular(*recipient, response_packet, lane, packet_type);
if let Err(err) = mixnet_client_sender.send(input_message).await {
log::error!("IpPacketRouter [main loop]: failed to send packet to mixnet: {err}");
};
},
Ok(None) => {
continue;
},
Err(err) => {
log::error!("Error handling mixnet message: {err}");
}
};
} else {
log::trace!("IpPacketRouter [main loop]: stopping since channel closed");
break;
};
},
}
}
log::info!("IpPacketRouter: stopping");
Ok(())
}
}
enum ConnectedClientEvent {
Disconnect(IpAddr),
Connect(IpAddr, Recipient),
}
struct ParsedPacket<'a> {
packet_type: &'a str,
src_addr: IpAddr,
@@ -605,8 +312,8 @@ fn parse_packet(packet: &[u8]) -> Result<ParsedPacket, IpPacketRouterError> {
})?;
let (packet_type, dst_port) = match headers.transport {
Some(etherparse::TransportSlice::Udp(header)) => ("udp", Some(header.destination_port())),
Some(etherparse::TransportSlice::Tcp(header)) => ("tcp", Some(header.destination_port())),
Some(etherparse::TransportSlice::Udp(header)) => ("ipv4", Some(header.destination_port())),
Some(etherparse::TransportSlice::Tcp(header)) => ("ipv6", Some(header.destination_port())),
Some(etherparse::TransportSlice::Icmpv4(_)) => ("icmpv4", None),
Some(etherparse::TransportSlice::Icmpv6(_)) => ("icmpv6", None),
Some(etherparse::TransportSlice::Unknown(_)) => ("unknown", None),
@@ -686,37 +393,3 @@ async fn create_mixnet_client(
.await
.map_err(|err| IpPacketRouterError::FailedToConnectToMixnet { source: err })
}
// Copyright (c) 2019 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause
const IPV4_MIN_HEADER_SIZE: usize = 20;
const IPV4_DST_IP_OFF: usize = 16;
const IPV4_IP_SZ: usize = 4;
const IPV6_MIN_HEADER_SIZE: usize = 40;
const IPV6_DST_IP_OFF: usize = 24;
const IPV6_IP_SZ: usize = 16;
pub fn parse_dst_addr(packet: &[u8]) -> Option<IpAddr> {
if packet.is_empty() {
return None;
}
match packet[0] >> 4 {
4 if packet.len() >= IPV4_MIN_HEADER_SIZE => {
let addr_bytes: [u8; IPV4_IP_SZ] = packet
[IPV4_DST_IP_OFF..IPV4_DST_IP_OFF + IPV4_IP_SZ]
.try_into()
.unwrap();
Some(IpAddr::from(addr_bytes))
}
6 if packet.len() >= IPV6_MIN_HEADER_SIZE => {
let addr_bytes: [u8; IPV6_IP_SZ] = packet
[IPV6_DST_IP_OFF..IPV6_DST_IP_OFF + IPV6_IP_SZ]
.try_into()
.unwrap();
Some(IpAddr::from(addr_bytes))
}
_ => None,
}
}