Compare commits

...

25 Commits

Author SHA1 Message Date
Jon Häggblad a4b880ba4a More optimized parsing 2023-11-28 10:11:41 +01:00
Jon Häggblad 76d6c51f91 try a different parse_dst_addr 2023-11-28 09:59:15 +01:00
Jon Häggblad 3a3d854eb7 Try with split state 2023-11-28 09:42:55 +01:00
Jon Häggblad 748dd1faf3 Fix typo in log 2023-11-28 09:25:05 +01:00
Jon Häggblad c2cb343135 Fix connected clients 2023-11-28 08:56:20 +01:00
Jon Häggblad 0581d40bbf Initial split out tun listener: 2023-11-28 08:43:03 +01:00
Jon Häggblad 4a8c09f476 Handle dynamic ip allocation in ip packet router (#4186)
* Add dynamic connect support to ip packet router

* Disconnect inactive clients

* Don't generate ip same as tun device

* clippy

* Extract a few functions to separate mod

* clippy
2023-11-27 15:31:35 +01:00
Jon Häggblad ca2ad13579 Handle connect request in IPR (#4183)
* wip: first attempt at connected clients

* redo connected clients

* fix bug in if statement
2023-11-27 13:18:31 +01:00
Jon Häggblad 44395f8466 Implement ip-packet-router connect request response outline (#4177)
* Try revised request response connect types

* wip: try to get data requests work

* Also serialize response

* tweak log

* Add code path for connect response fail

* Convenience functions on ip packet request

* Remove commented out code

* Restore log to debug
2023-11-27 12:17:58 +01:00
Jędrzej Stuczyński 39d714f2c0 Merge pull request #4171 from nymtech/research/partial_blind_signature_verification
Research/partial blind signature verification
2023-11-27 09:57:44 +00:00
Jon Häggblad 336f220c83 Fix some clippy for beta toolchain (#4182) 2023-11-27 10:52:18 +01:00
Tommy Verrall 6b2c13b3fd Merge pull request #4126 from nymtech/chore/adjust-selection-weights
using performance^20 when calculating active set selection weight
2023-11-27 07:46:57 +00:00
Tommy Verrall 17357da97e Merge pull request #4167 from nymtech/bugfix/gateway-ws-listener-before-NR
start gateway websocket listener before embedded NR
2023-11-24 17:59:06 +00:00
Jędrzej Stuczyński d92c8c4149 deal with the pow error during stake adjustment 2023-11-24 09:40:17 +00:00
Jędrzej Stuczyński 4c8fa74dfe remove the panic 2023-11-24 09:31:46 +00:00
Jędrzej Stuczyński b1ac5b4e86 using multimiller loop for partial blinded signature verification 2023-11-23 11:21:47 +00:00
aniampio ae22a132a3 Update benchmarks 2023-11-23 11:21:47 +00:00
aniampio d0b9583b79 Add benchmark for verification of a single partial blind signature 2023-11-23 11:21:46 +00:00
aniampio 1f5568b4a7 Extend the function to support also public attributes 2023-11-23 11:21:46 +00:00
aniampio 16f51a0014 Add documentation to the verification function 2023-11-23 11:21:46 +00:00
aniampio 5e8767c5d6 Update tests 2023-11-23 11:21:46 +00:00
aniampio 15abfb390f Make the function more generic 2023-11-23 11:21:46 +00:00
aniampio 45e5f12198 Add function to verify partial blind signature 2023-11-23 11:21:46 +00:00
Jędrzej Stuczyński 24746dc47b start gateway websocket listener before embedded NR 2023-11-21 14:32:23 +00:00
Jędrzej Stuczyński ef8e452f30 using performance^20 when calculating active set selection weight 2023-11-10 09:54:03 +00:00
16 changed files with 1080 additions and 147 deletions
Generated
+4
View File
@@ -6608,7 +6608,9 @@ dependencies = [
"bincode",
"bytes",
"nym-sphinx",
"rand 0.8.5",
"serde",
"thiserror",
]
[[package]]
@@ -6617,6 +6619,7 @@ version = "0.1.0"
dependencies = [
"bincode",
"bytes",
"dashmap",
"etherparse",
"futures",
"log",
@@ -6633,6 +6636,7 @@ dependencies = [
"nym-tun",
"nym-wireguard",
"nym-wireguard-types",
"rand 0.8.5",
"reqwest",
"serde",
"serde_json",
@@ -3,6 +3,7 @@
use cosmwasm_schema::cw_serde;
use cosmwasm_std::Decimal;
use cosmwasm_std::OverflowError;
use cosmwasm_std::Uint128;
use serde::de::Error;
use serde::{Deserialize, Deserializer};
@@ -71,6 +72,10 @@ impl Percent {
// we know the cast from u128 to u8 is a safe one since the internal value must be within 0 - 1 range
truncate_decimal(hundred * self.0).u128() as u8
}
pub fn checked_pow(&self, exp: u32) -> Result<Self, OverflowError> {
self.0.checked_pow(exp).map(Percent)
}
}
impl Display for Percent {
+5 -25
View File
@@ -366,11 +366,7 @@ pub fn creating_proof_of_chunking_for_100_parties(c: &mut Criterion) {
.map(|&node_index| polynomial.evaluate_at(&Scalar::from(node_index)).into())
.collect::<Vec<_>>();
let remote_share_key_pairs = shares
.iter()
.zip(receivers.values())
.map(|(share, key)| (share, key))
.collect::<Vec<_>>();
let remote_share_key_pairs = shares.iter().zip(receivers.values()).collect::<Vec<_>>();
let ordered_public_keys = receivers.values().copied().collect::<Vec<_>>();
let (ciphertexts, hazmat) = encrypt_shares(&remote_share_key_pairs, &params, &mut rng);
@@ -400,11 +396,7 @@ pub fn verifying_proof_of_chunking_for_100_parties(c: &mut Criterion) {
.map(|&node_index| polynomial.evaluate_at(&Scalar::from(node_index)).into())
.collect::<Vec<_>>();
let remote_share_key_pairs = shares
.iter()
.zip(receivers.values())
.map(|(share, key)| (share, key))
.collect::<Vec<_>>();
let remote_share_key_pairs = shares.iter().zip(receivers.values()).collect::<Vec<_>>();
let ordered_public_keys = receivers.values().copied().collect::<Vec<_>>();
let (ciphertexts, hazmat) = encrypt_shares(&remote_share_key_pairs, &params, &mut rng);
@@ -436,11 +428,7 @@ pub fn creating_proof_of_secret_sharing_for_100_parties(c: &mut Criterion) {
.map(|&node_index| polynomial.evaluate_at(&Scalar::from(node_index)).into())
.collect::<Vec<_>>();
let remote_share_key_pairs = shares
.iter()
.zip(receivers.values())
.map(|(share, key)| (share, key))
.collect::<Vec<_>>();
let remote_share_key_pairs = shares.iter().zip(receivers.values()).collect::<Vec<_>>();
let (ciphertexts, hazmat) = encrypt_shares(&remote_share_key_pairs, &params, &mut rng);
@@ -478,11 +466,7 @@ pub fn verifying_proof_of_secret_sharing_for_100_parties(c: &mut Criterion) {
.map(|&node_index| polynomial.evaluate_at(&Scalar::from(node_index)).into())
.collect::<Vec<_>>();
let remote_share_key_pairs = shares
.iter()
.zip(receivers.values())
.map(|(share, key)| (share, key))
.collect::<Vec<_>>();
let remote_share_key_pairs = shares.iter().zip(receivers.values()).collect::<Vec<_>>();
let (ciphertexts, hazmat) = encrypt_shares(&remote_share_key_pairs, &params, &mut rng);
@@ -545,11 +529,7 @@ pub fn share_encryption_100(c: &mut Criterion) {
.map(|&node_index| polynomial.evaluate_at(&Scalar::from(node_index)).into())
.collect::<Vec<_>>();
let remote_share_key_pairs = shares
.iter()
.zip(receivers.values())
.map(|(share, key)| (share, key))
.collect::<Vec<_>>();
let remote_share_key_pairs = shares.iter().zip(receivers.values()).collect::<Vec<_>>();
c.bench_function("100 shares encryption", |b| {
b.iter(|| black_box(encrypt_shares(&remote_share_key_pairs, &params, &mut rng)))
+2
View File
@@ -14,4 +14,6 @@ license.workspace = true
bincode = "1.3.3"
bytes = "1.5.0"
nym-sphinx = { path = "../nymsphinx" }
rand = "0.8.5"
serde = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
+349 -16
View File
@@ -1,35 +1,308 @@
#[derive(serde::Serialize, serde::Deserialize)]
pub struct TaggedIpPacket {
pub packet: bytes::Bytes,
pub return_address: nym_sphinx::addressing::clients::Recipient,
pub return_mix_hops: Option<u8>,
// pub return_mix_delays: Option<f64>,
use std::net::IpAddr;
use nym_sphinx::addressing::clients::Recipient;
use serde::{Deserialize, Serialize};
pub const CURRENT_VERSION: u8 = 1;
fn generate_random() -> u64 {
use rand::RngCore;
let mut rng = rand::rngs::OsRng;
rng.next_u64()
}
impl TaggedIpPacket {
pub fn new(
packet: bytes::Bytes,
return_address: nym_sphinx::addressing::clients::Recipient,
return_mix_hops: Option<u8>,
) -> Self {
TaggedIpPacket {
packet,
return_address,
return_mix_hops,
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct IpPacketRequest {
pub version: u8,
pub data: IpPacketRequestData,
}
impl IpPacketRequest {
pub fn new_static_connect_request(
ip: IpAddr,
reply_to: Recipient,
reply_to_hops: Option<u8>,
reply_to_avg_mix_delays: Option<f64>,
) -> (Self, u64) {
let request_id = generate_random();
(
Self {
version: CURRENT_VERSION,
data: IpPacketRequestData::StaticConnect(StaticConnectRequest {
request_id,
ip,
reply_to,
reply_to_hops,
reply_to_avg_mix_delays,
}),
},
request_id,
)
}
pub fn new_dynamic_connect_request(
reply_to: Recipient,
reply_to_hops: Option<u8>,
reply_to_avg_mix_delays: Option<f64>,
) -> (Self, u64) {
let request_id = generate_random();
(
Self {
version: CURRENT_VERSION,
data: IpPacketRequestData::DynamicConnect(DynamicConnectRequest {
request_id,
reply_to,
reply_to_hops,
reply_to_avg_mix_delays,
}),
},
request_id,
)
}
pub fn new_ip_packet(ip_packet: bytes::Bytes) -> Self {
Self {
version: CURRENT_VERSION,
data: IpPacketRequestData::Data(DataRequest { ip_packet }),
}
}
pub fn id(&self) -> Option<u64> {
match &self.data {
IpPacketRequestData::StaticConnect(request) => Some(request.request_id),
IpPacketRequestData::DynamicConnect(request) => Some(request.request_id),
IpPacketRequestData::Data(_) => None,
}
}
pub fn recipient(&self) -> Option<&Recipient> {
match &self.data {
IpPacketRequestData::StaticConnect(request) => Some(&request.reply_to),
IpPacketRequestData::DynamicConnect(request) => Some(&request.reply_to),
IpPacketRequestData::Data(_) => None,
}
}
pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::Error> {
use bincode::Options;
make_bincode_serializer().serialize(self)
}
pub fn from_reconstructed_message(
message: &nym_sphinx::receiver::ReconstructedMessage,
) -> Result<Self, bincode::Error> {
use bincode::Options;
make_bincode_serializer().deserialize(&message.message)
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub enum IpPacketRequestData {
StaticConnect(StaticConnectRequest),
DynamicConnect(DynamicConnectRequest),
Data(DataRequest),
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct StaticConnectRequest {
pub request_id: u64,
pub ip: IpAddr,
pub reply_to: Recipient,
pub reply_to_hops: Option<u8>,
pub reply_to_avg_mix_delays: Option<f64>,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct DynamicConnectRequest {
pub request_id: u64,
pub reply_to: Recipient,
pub reply_to_hops: Option<u8>,
pub reply_to_avg_mix_delays: Option<f64>,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct DataRequest {
pub ip_packet: bytes::Bytes,
}
// ---
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct IpPacketResponse {
pub version: u8,
pub data: IpPacketResponseData,
}
impl IpPacketResponse {
pub fn new_static_connect_success(request_id: u64, reply_to: Recipient) -> Self {
Self {
version: CURRENT_VERSION,
data: IpPacketResponseData::StaticConnect(StaticConnectResponse {
request_id,
reply_to,
reply: StaticConnectResponseReply::Success,
}),
}
}
pub fn new_static_connect_failure(
request_id: u64,
reply_to: Recipient,
reason: StaticConnectFailureReason,
) -> Self {
Self {
version: CURRENT_VERSION,
data: IpPacketResponseData::StaticConnect(StaticConnectResponse {
request_id,
reply_to,
reply: StaticConnectResponseReply::Failure(reason),
}),
}
}
pub fn new_dynamic_connect_success(request_id: u64, reply_to: Recipient, ip: IpAddr) -> Self {
Self {
version: CURRENT_VERSION,
data: IpPacketResponseData::DynamicConnect(DynamicConnectResponse {
request_id,
reply_to,
reply: DynamicConnectResponseReply::Success(DynamicConnectSuccess { ip }),
}),
}
}
pub fn new_dynamic_connect_failure(
request_id: u64,
reply_to: Recipient,
reason: DynamicConnectFailureReason,
) -> Self {
Self {
version: CURRENT_VERSION,
data: IpPacketResponseData::DynamicConnect(DynamicConnectResponse {
request_id,
reply_to,
reply: DynamicConnectResponseReply::Failure(reason),
}),
}
}
pub fn new_ip_packet(ip_packet: bytes::Bytes) -> Self {
Self {
version: CURRENT_VERSION,
data: IpPacketResponseData::Data(DataResponse { ip_packet }),
}
}
pub fn id(&self) -> Option<u64> {
match &self.data {
IpPacketResponseData::StaticConnect(response) => Some(response.request_id),
IpPacketResponseData::DynamicConnect(response) => Some(response.request_id),
IpPacketResponseData::Data(_) => None,
}
}
pub fn recipient(&self) -> Option<&Recipient> {
match &self.data {
IpPacketResponseData::StaticConnect(response) => Some(&response.reply_to),
IpPacketResponseData::DynamicConnect(response) => Some(&response.reply_to),
IpPacketResponseData::Data(_) => None,
}
}
pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::Error> {
use bincode::Options;
make_bincode_serializer().serialize(self)
}
pub fn from_reconstructed_message(
message: &nym_sphinx::receiver::ReconstructedMessage,
) -> Result<Self, bincode::Error> {
use bincode::Options;
make_bincode_serializer().deserialize(&message.message)
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum IpPacketResponseData {
StaticConnect(StaticConnectResponse),
DynamicConnect(DynamicConnectResponse),
Data(DataResponse),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct StaticConnectResponse {
pub request_id: u64,
pub reply_to: Recipient,
pub reply: StaticConnectResponseReply,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum StaticConnectResponseReply {
Success,
Failure(StaticConnectFailureReason),
}
impl StaticConnectResponseReply {
pub fn is_success(&self) -> bool {
match self {
StaticConnectResponseReply::Success => true,
StaticConnectResponseReply::Failure(_) => false,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)]
pub enum StaticConnectFailureReason {
#[error("requested ip address is already in use")]
RequestedIpAlreadyInUse,
#[error("requested nym-address is already in use")]
RequestedNymAddressAlreadyInUse,
#[error("{0}")]
Other(String),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DynamicConnectResponse {
pub request_id: u64,
pub reply_to: Recipient,
pub reply: DynamicConnectResponseReply,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum DynamicConnectResponseReply {
Success(DynamicConnectSuccess),
Failure(DynamicConnectFailureReason),
}
impl DynamicConnectResponseReply {
pub fn is_success(&self) -> bool {
match self {
DynamicConnectResponseReply::Success(_) => true,
DynamicConnectResponseReply::Failure(_) => false,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DynamicConnectSuccess {
pub ip: IpAddr,
}
#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)]
pub enum DynamicConnectFailureReason {
#[error("requested nym-address is already in use")]
RequestedNymAddressAlreadyInUse,
#[error("no available ip address")]
NoAvailableIp,
#[error("{0}")]
Other(String),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DataResponse {
pub ip_packet: bytes::Bytes,
}
fn make_bincode_serializer() -> impl bincode::Options {
@@ -38,3 +311,63 @@ fn make_bincode_serializer() -> impl bincode::Options {
.with_big_endian()
.with_varint_encoding()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn check_size_of_request() {
let connect = IpPacketRequest {
version: 4,
data: IpPacketRequestData::StaticConnect(
StaticConnectRequest {
request_id: 123,
ip: IpAddr::from([10, 0, 0, 1]),
reply_to: Recipient::try_from_base58_string("D1rrpsysCGCYXy9saP8y3kmNpGtJZUXN9SvFoUcqAsM9.9Ssso1ea5NfkbMASdiseDSjTN1fSWda5SgEVjdSN4CvV@GJqd3ZxpXWSNxTfx7B1pPtswpetH4LnJdFeLeuY5KUuN").unwrap(),
reply_to_hops: None,
reply_to_avg_mix_delays: None,
},
)
};
assert_eq!(connect.to_bytes().unwrap().len(), 107);
}
#[test]
fn check_size_of_data() {
let data = IpPacketRequest {
version: 4,
data: IpPacketRequestData::Data(DataRequest {
ip_packet: bytes::Bytes::from(vec![1u8; 32]),
}),
};
assert_eq!(data.to_bytes().unwrap().len(), 35);
}
#[test]
fn serialize_and_deserialize_data_request() {
let data = IpPacketRequest {
version: 4,
data: IpPacketRequestData::Data(DataRequest {
ip_packet: bytes::Bytes::from(vec![1, 2, 4, 2, 5]),
}),
};
let serialized = data.to_bytes().unwrap();
let deserialized = IpPacketRequest::from_reconstructed_message(
&nym_sphinx::receiver::ReconstructedMessage {
message: serialized,
sender_tag: None,
},
)
.unwrap();
assert_eq!(deserialized.version, 4);
assert_eq!(
deserialized.data,
IpPacketRequestData::Data(DataRequest {
ip_packet: bytes::Bytes::from(vec![1, 2, 4, 2, 5]),
})
);
}
}
+27 -5
View File
@@ -6,9 +6,10 @@ use criterion::{criterion_group, criterion_main, Criterion};
use ff::Field;
use group::{Curve, Group};
use nym_coconut::{
aggregate_signature_shares, aggregate_verification_keys, blind_sign, elgamal_keygen,
prepare_blind_sign, prove_bandwidth_credential, setup, ttp_keygen, verify_credential,
Attribute, BlindedSignature, Parameters, Signature, SignatureShare, VerificationKey,
aggregate_signature_shares, aggregate_verification_keys, blind_sign, prepare_blind_sign,
prove_bandwidth_credential, setup, ttp_keygen, verify_credential,
verify_partial_blind_signature, Attribute, BlindedSignature, Parameters, Signature,
SignatureShare, VerificationKey,
};
use rand::seq::SliceRandom;
use std::ops::Neg;
@@ -175,8 +176,6 @@ fn bench_coconut(c: &mut Criterion) {
let binding_number = params.random_scalar();
let private_attributes = vec![serial_number, binding_number];
let _elgamal_keypair = elgamal_keygen(&params);
// The prepare blind sign is performed by the user
let (pedersen_commitments_openings, blind_sign_request) =
prepare_blind_sign(&params, &private_attributes, &public_attributes).unwrap();
@@ -242,6 +241,29 @@ fn bench_coconut(c: &mut Criterion) {
.map(|keypair| keypair.verification_key())
.collect();
// verify a random partial blind signature
let rand_idx = 1;
let random_blind_signature = blinded_signatures.get(rand_idx).unwrap();
let partial_verification_key = verification_keys.get(rand_idx).unwrap();
group.bench_function(
&format!(
"verify_partial_blind_signature_{}_private_attributes_{}_public_attributes",
case.num_private_attrs, case.num_public_attrs
),
|b| {
b.iter(|| {
verify_partial_blind_signature(
&params,
&blind_sign_request,
&public_attributes,
random_blind_signature,
partial_verification_key,
)
})
},
);
// Lets bench worse case, ie aggregating all
let indices: Vec<u64> = (1..=case.num_authorities).collect();
// aggregate verification keys
+1
View File
@@ -14,6 +14,7 @@ pub use scheme::aggregation::aggregate_signature_shares;
pub use scheme::aggregation::aggregate_verification_keys;
pub use scheme::issuance::blind_sign;
pub use scheme::issuance::prepare_blind_sign;
pub use scheme::issuance::verify_partial_blind_signature;
pub use scheme::issuance::BlindSignRequest;
pub use scheme::keygen::ttp_keygen;
pub use scheme::keygen::KeyPair;
+172 -2
View File
@@ -3,12 +3,14 @@
use std::convert::TryFrom;
use std::convert::TryInto;
use std::ops::Neg;
use bls12_381::{G1Affine, G1Projective, Scalar};
use group::{Curve, GroupEncoding};
use bls12_381::{multi_miller_loop, G1Affine, G1Projective, G2Prepared, Scalar};
use group::{Curve, Group, GroupEncoding};
use crate::error::{CoconutError, Result};
use crate::proofs::ProofCmCs;
use crate::scheme::keygen::VerificationKey;
use crate::scheme::setup::Parameters;
use crate::scheme::BlindedSignature;
use crate::scheme::SecretKey;
@@ -318,6 +320,96 @@ pub fn blind_sign(
Ok(BlindedSignature(h, sig))
}
/// Verifies a partial blind signature using the provided parameters and validator's verification key.
///
/// # Arguments
///
/// * `params` - A reference to the cryptographic parameters.
/// * `blind_sign_request` - A reference to the blind signature request signed by the client.
/// * `public_attributes` - A reference to the public attributes included in the client's request.
/// * `blind_sig` - A reference to the issued partial blinded signature to be verified.
/// * `partial_verification_key` - A reference to the validator's partial verification key.
///
/// # Returns
///
/// A boolean indicating whether the partial blind signature is valid (`true`) or not (`false`).
///
/// # Remarks
///
/// This function verifies the correctness and validity of a partial blind signature using
/// the provided cryptographic parameters, blind signature request, blinded signature,
/// and partial verification key.
/// It calculates pairings based on the provided values and checks whether the partial blind signature
/// is consistent with the verification key and commitments in the blind signature request.
/// The function returns `true` if the partial blind signature is valid, and `false` otherwise.
pub fn verify_partial_blind_signature(
params: &Parameters,
blind_sign_request: &BlindSignRequest,
public_attributes: &[Attribute],
blind_sig: &BlindedSignature,
partial_verification_key: &VerificationKey,
) -> bool {
let num_private_attributes = blind_sign_request.private_attributes_commitments.len();
if num_private_attributes + public_attributes.len() > partial_verification_key.beta_g2.len() {
return false;
}
// TODO: we're losing some memory here due to extra allocation,
// but worst-case scenario (given SANE amount of attributes), it's just few kb at most
let c_neg = blind_sig.1.to_affine().neg();
let g2_prep = params.prepared_miller_g2();
let mut terms = vec![
// (c^{-1}, g2)
(c_neg, g2_prep.clone()),
// (s, alpha)
(
blind_sig.0.to_affine(),
G2Prepared::from(partial_verification_key.alpha.to_affine()),
),
];
// for each private attribute, add (cm_i, beta_i) to the miller terms
for (private_attr_commit, beta_g2) in blind_sign_request
.private_attributes_commitments
.iter()
.zip(&partial_verification_key.beta_g2)
{
// (cm_i, beta_i)
terms.push((
private_attr_commit.to_affine(),
G2Prepared::from(beta_g2.to_affine()),
))
}
// for each public attribute, add (s^pub_j, beta_{priv + j}) to the miller terms
for (pub_attr, beta_g2) in public_attributes.iter().zip(
partial_verification_key
.beta_g2
.iter()
.skip(num_private_attributes),
) {
// (s^pub_j, beta_j)
terms.push((
(blind_sig.0 * pub_attr).to_affine(),
G2Prepared::from(beta_g2.to_affine()),
))
}
// get the references to all the terms to get the arguments the miller loop expects
let terms_refs = terms.iter().map(|(g1, g2)| (g1, g2)).collect::<Vec<_>>();
// since checking whether e(a, b) == e(c, d)
// is equivalent to checking e(a, b) • e(c, d)^{-1} == id
// and thus to e(a, b) • e(c^{-1}, d) == id
//
// compute e(c^1, g2) • e(s, alpha) • e(cm_0, beta_0) • e(cm_i, beta_i) • (s^pub_0, beta_{i+1}) (s^pub_j, beta_{i + j})
multi_miller_loop(&terms_refs)
.final_exponentiation()
.is_identity()
.into()
}
#[cfg(test)]
pub fn sign(
params: &mut Parameters,
@@ -354,6 +446,7 @@ pub fn sign(
#[cfg(test)]
mod tests {
use super::*;
use crate::scheme::keygen::keygen;
#[test]
fn blind_sign_request_bytes_roundtrip() {
@@ -385,4 +478,81 @@ mod tests {
lambda
);
}
#[test]
fn successful_verify_partial_blind_signature() {
let params = Parameters::new(4).unwrap();
let private_attributes = params.n_random_scalars(2);
let public_attributes = params.n_random_scalars(2);
let (_commitments_openings, request) =
prepare_blind_sign(&params, &private_attributes, &public_attributes).unwrap();
let validator_keypair = keygen(&params);
let blind_sig = blind_sign(
&params,
&validator_keypair.secret_key(),
&request,
&public_attributes,
)
.unwrap();
assert!(verify_partial_blind_signature(
&params,
&request,
&public_attributes,
&blind_sig,
&validator_keypair.verification_key()
));
}
#[test]
fn successful_verify_partial_blind_signature_no_public_attributes() {
let params = Parameters::new(4).unwrap();
let private_attributes = params.n_random_scalars(2);
let (_commitments_openings, request) =
prepare_blind_sign(&params, &private_attributes, &[]).unwrap();
let validator_keypair = keygen(&params);
let blind_sig =
blind_sign(&params, &validator_keypair.secret_key(), &request, &[]).unwrap();
assert!(verify_partial_blind_signature(
&params,
&request,
&[],
&blind_sig,
&validator_keypair.verification_key()
));
}
#[test]
fn fail_verify_partial_blind_signature_with_wrong_key() {
let params = Parameters::new(4).unwrap();
let private_attributes = params.n_random_scalars(2);
let public_attributes = params.n_random_scalars(2);
let (_commitments_openings, request) =
prepare_blind_sign(&params, &private_attributes, &public_attributes).unwrap();
let validator_keypair = keygen(&params);
let validator2_keypair = keygen(&params);
let blind_sig = blind_sign(
&params,
&validator_keypair.secret_key(),
&request,
&public_attributes,
)
.unwrap();
// this assertion should fail, as we try to verify with a wrong validator key
assert!(!verify_partial_blind_signature(
&params,
&request,
&public_attributes,
&blind_sig,
&validator2_keypair.verification_key()
),);
}
}
+53 -16
View File
@@ -180,10 +180,10 @@ impl TunDevice {
);
// TODO: expire old entries
#[allow(irrefutable_let_patterns)]
if let RoutingMode::Nat(nat_table) = &mut self.routing_mode {
nat_table.nat_table.insert(src_addr, tag);
}
// #[allow(irrefutable_let_patterns)]
// if let RoutingMode::Nat(nat_table) = &mut self.routing_mode {
// nat_table.nat_table.insert(src_addr, tag);
// }
timeout(
Duration::from_millis(TUN_WRITE_TIMEOUT_MS),
@@ -196,11 +196,13 @@ impl TunDevice {
// Receive reponse packets from the wild internet
async fn handle_tun_read(&self, packet: &[u8]) -> Result<(), TunDeviceError> {
let ParsedAddresses { src_addr, dst_addr } = parse_src_dst_address(packet)?;
log::debug!(
"iface: read Packet({src_addr} -> {dst_addr}, {} bytes)",
packet.len(),
);
// let ParsedAddresses { src_addr, dst_addr } = parse_src_dst_address(packet)?;
// log::debug!(
// "iface: read Packet({dst_addr} <- {src_addr}, {} bytes)",
// packet.len(),
// );
let dst_addr =
parse_dst_addr(packet).ok_or(TunDeviceError::UnableToParseAddressIpHeaderMissing)?;
// Route packet to the correct peer.
@@ -220,13 +222,14 @@ impl TunDevice {
// But we can also do it by consulting the NAT table.
RoutingMode::Nat(ref nat_table) => {
if let Some(tag) = nat_table.nat_table.get(&dst_addr) {
log::debug!("Forward packet with NAT tag: {tag}");
return self
.tun_task_response_tx
.try_send((*tag, packet.to_vec()))
.map_err(|err| err.into());
}
// if let Some(tag) = nat_table.nat_table.get(&dst_addr) {
// log::debug!("Forward packet with NAT tag: {tag}");
return self
.tun_task_response_tx
// .try_send((*tag, packet.to_vec()))
.try_send((0, packet.to_vec()))
.map_err(|err| err.into());
// }
}
}
@@ -287,3 +290,37 @@ fn parse_src_dst_address(packet: &[u8]) -> Result<ParsedAddresses, TunDeviceErro
None => Err(TunDeviceError::UnableToParseAddressIpHeaderMissing),
}
}
// Copyright (c) 2019 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause
const IPV4_MIN_HEADER_SIZE: usize = 20;
const IPV4_DST_IP_OFF: usize = 16;
const IPV4_IP_SZ: usize = 4;
const IPV6_MIN_HEADER_SIZE: usize = 40;
const IPV6_DST_IP_OFF: usize = 24;
const IPV6_IP_SZ: usize = 16;
pub fn parse_dst_addr(packet: &[u8]) -> Option<IpAddr> {
if packet.is_empty() {
return None;
}
match packet[0] >> 4 {
4 if packet.len() >= IPV4_MIN_HEADER_SIZE => {
let addr_bytes: [u8; IPV4_IP_SZ] = packet
[IPV4_DST_IP_OFF..IPV4_DST_IP_OFF + IPV4_IP_SZ]
.try_into()
.unwrap();
Some(IpAddr::from(addr_bytes))
}
6 if packet.len() >= IPV6_MIN_HEADER_SIZE => {
let addr_bytes: [u8; IPV6_IP_SZ] = packet
[IPV6_DST_IP_OFF..IPV6_DST_IP_OFF + IPV6_IP_SZ]
.try_into()
.unwrap();
Some(IpAddr::from(addr_bytes))
}
_ => None,
}
}
+8 -8
View File
@@ -476,6 +476,13 @@ impl<St> Gateway<St> {
});
}
self.start_client_websocket_listener(
mix_forwarding_channel.clone(),
active_clients_store.clone(),
shutdown.subscribe().named("websocket::Listener"),
Arc::new(coconut_verifier),
);
let nr_request_filter = if self.config.network_requester.enabled {
let embedded_nr = self
.start_network_requester(
@@ -496,7 +503,7 @@ impl<St> Gateway<St> {
if self.config.ip_packet_router.enabled {
let embedded_ip_sp = self
.start_ip_packet_router(
mix_forwarding_channel.clone(),
mix_forwarding_channel,
shutdown.subscribe().named("ip_service_provider"),
)
.await?;
@@ -513,13 +520,6 @@ impl<St> Gateway<St> {
.with_maybe_network_request_filter(nr_request_filter)
.start(shutdown.subscribe().named("http-api"))?;
self.start_client_websocket_listener(
mix_forwarding_channel,
active_clients_store,
shutdown.subscribe().named("websocket::Listener"),
Arc::new(coconut_verifier),
);
// Once this is a bit more mature, make this a commandline flag instead of a compile time
// flag
#[cfg(feature = "wireguard")]
@@ -24,7 +24,15 @@ struct MixnodeWithStakeAndPerformance {
impl MixnodeWithStakeAndPerformance {
fn to_selection_weight(&self) -> f64 {
let scaled_stake = self.total_stake * self.performance;
let scaled_performance = match self.performance.checked_pow(20) {
Ok(perf) => perf,
Err(overflow) => {
warn!("the node's performance ({}) has overflow while scaling it by the factor of 20: {overflow}. Setting it to 0 instead.", self.performance);
return 0.;
}
};
let scaled_stake = self.total_stake * scaled_performance;
stake_to_f64(scaled_stake)
}
}
@@ -168,6 +168,7 @@ impl MixnetClient {
}
}
#[derive(Clone)]
pub struct MixnetClientSender {
client_input: ClientInput,
packet_type: Option<PacketType>,
@@ -11,6 +11,7 @@ license.workspace = true
[dependencies]
bincode = "1.3.3"
bytes = "1.5.0"
dashmap.workspace = true
etherparse = "0.13.0"
futures = { workspace = true }
log = { workspace = true }
@@ -27,6 +28,7 @@ nym-task = { path = "../../common/task" }
nym-tun = { path = "../../common/tun" }
nym-wireguard = { path = "../../common/wireguard" }
nym-wireguard-types = { path = "../../common/wireguard-types" }
rand = "0.8.5"
reqwest.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
@@ -30,6 +30,9 @@ pub enum IpPacketRouterError {
#[error("the entity wrapping the network requester has disconnected")]
DisconnectedParent,
#[error("received packet has an invalid version: {0}")]
InvalidPacketVersion(u8),
#[error("failed to deserialize tagged packet: {source}")]
FailedToDeserializeTaggedPacket { source: bincode::Error },
@@ -0,0 +1,38 @@
use std::{
collections::HashMap,
net::{IpAddr, Ipv4Addr},
};
use crate::{ConnectedClient, TUN_DEVICE_ADDRESS};
// Find an available IP address in self.connected_clients
// TODO: make this nicer
fn generate_random_ip_within_subnet() -> Ipv4Addr {
let mut rng = rand::thread_rng();
// Generate a random number in the range 1-254
let last_octet = rand::Rng::gen_range(&mut rng, 1..=254);
Ipv4Addr::new(10, 0, 0, last_octet)
}
fn is_ip_taken(
connected_clients: &HashMap<IpAddr, ConnectedClient>,
tun_ip: Ipv4Addr,
ip: Ipv4Addr,
) -> bool {
connected_clients.contains_key(&ip.into()) || ip == tun_ip
}
// TODO: brute force approach. We could consider using a more efficient algorithm
pub(crate) fn find_new_ip(connected_clients: &HashMap<IpAddr, ConnectedClient>) -> Option<IpAddr> {
let mut new_ip = generate_random_ip_within_subnet();
let mut tries = 0;
let tun_ip = TUN_DEVICE_ADDRESS.parse::<Ipv4Addr>().unwrap();
while is_ip_taken(connected_clients, tun_ip, new_ip) {
new_ip = generate_random_ip_within_subnet();
tries += 1;
if tries > 100 {
return None;
}
}
Some(new_ip.into())
}
+401 -74
View File
@@ -1,8 +1,11 @@
#![cfg_attr(not(target_os = "linux"), allow(dead_code))]
use std::{
collections::HashMap,
net::{IpAddr, SocketAddr},
path::Path,
sync::Arc,
time::Duration,
};
use error::IpPacketRouterError;
@@ -11,6 +14,10 @@ use nym_client_core::{
client::mix_traffic::transceiver::GatewayTransceiver,
config::disk_persistence::CommonClientPaths, HardcodedTopologyProvider, TopologyProvider,
};
use nym_ip_packet_requests::{
DynamicConnectFailureReason, IpPacketRequest, IpPacketRequestData, IpPacketResponse,
StaticConnectFailureReason,
};
use nym_sdk::{
mixnet::{InputMessage, MixnetMessageSender, Recipient},
NymNetworkDetails,
@@ -25,6 +32,7 @@ pub use crate::config::Config;
pub mod config;
pub mod error;
mod generate_new_ip;
mod request_filter;
// The interface used to route traffic
@@ -32,6 +40,9 @@ pub const TUN_BASE_NAME: &str = "nymtun";
pub const TUN_DEVICE_ADDRESS: &str = "10.0.0.1";
pub const TUN_DEVICE_NETMASK: &str = "255.255.255.0";
const DISCONNECT_TIMER_INTERVAL: Duration = Duration::from_secs(10);
const CLIENT_INACTIVITY_TIMEOUT: Duration = Duration::from_secs(5 * 60);
pub struct OnStartData {
// to add more fields as required
pub address: Recipient,
@@ -157,9 +168,10 @@ impl IpPacketRouterBuilder {
_config: self.config,
request_filter: request_filter.clone(),
tun_task_tx,
tun_task_response_rx,
tun_task_response_rx: Some(tun_task_response_rx),
mixnet_client,
task_handle,
connected_clients: Default::default(),
};
log::info!("The address of this client is: {self_address}");
@@ -179,85 +191,173 @@ impl IpPacketRouterBuilder {
}
}
#[allow(unused)]
#[cfg_attr(not(target_os = "linux"), allow(unused))]
struct IpPacketRouter {
_config: Config,
request_filter: request_filter::RequestFilter,
tun_task_tx: nym_tun::tun_task_channel::TunTaskTx,
tun_task_response_rx: nym_tun::tun_task_channel::TunTaskResponseRx,
tun_task_response_rx: Option<nym_tun::tun_task_channel::TunTaskResponseRx>,
mixnet_client: nym_sdk::mixnet::MixnetClient,
task_handle: TaskHandle,
connected_clients: HashMap<IpAddr, ConnectedClient>,
}
#[allow(unused)]
struct ConnectedClient {
nym_address: Recipient,
last_activity: std::time::Instant,
}
#[cfg_attr(not(target_os = "linux"), allow(unused))]
impl IpPacketRouter {
async fn run(mut self) -> Result<(), IpPacketRouterError> {
let mut task_client = self.task_handle.fork("main_loop");
while !task_client.is_shutdown() {
tokio::select! {
_ = task_client.recv() => {
log::debug!("IpPacketRouter [main loop]: received shutdown");
},
msg = self.mixnet_client.next() => {
if let Some(msg) = msg {
if let Err(err) = self.on_message(msg).await {
log::error!("Error handling mixnet message: {err}");
};
} else {
log::trace!("IpPacketRouter [main loop]: stopping since channel closed");
break;
};
},
packet = self.tun_task_response_rx.recv() => {
if let Some((_tag, packet)) = packet {
// Read recipient from env variable NYM_CLIENT_ADDR which is a base58
// string of the nym-address of the client that the packet should be
// sent back to.
//
// In the near future we will let the client expose it's nym-address
// directly, and after that, provide SURBS
let recipient = std::env::var("NYM_CLIENT_ADDR").ok().and_then(|addr| {
Recipient::try_from_base58_string(addr).ok()
});
if let Some(recipient) = recipient {
let lane = TransmissionLane::General;
let packet_type = None;
let input_message = InputMessage::new_regular(recipient, packet, lane, packet_type);
if let Err(err) = self.mixnet_client.send(input_message).await {
log::error!("IpPacketRouter [main loop]: failed to send packet to mixnet: {err}");
};
} else {
log::error!("NYM_CLIENT_ADDR not set or invalid");
}
} else {
log::trace!("IpPacketRouter [main loop]: stopping since channel closed");
break;
}
}
}
}
log::info!("IpPacketRouter: stopping");
Ok(())
}
async fn on_message(
async fn on_static_connect_request(
&mut self,
reconstructed: ReconstructedMessage,
) -> Result<(), IpPacketRouterError> {
log::debug!(
"Received message with sender_tag: {:?}",
reconstructed.sender_tag
connect_request: nym_ip_packet_requests::StaticConnectRequest,
connected_client_tx: &tokio::sync::mpsc::Sender<ConnectedClientEvent>,
) -> Result<Option<IpPacketResponse>, IpPacketRouterError> {
log::info!(
"Received static connect request from {sender_address}",
sender_address = connect_request.reply_to
);
let tagged_packet =
nym_ip_packet_requests::TaggedIpPacket::from_reconstructed_message(&reconstructed)
.map_err(|err| IpPacketRouterError::FailedToDeserializeTaggedPacket {
source: err,
})?;
let request_id = connect_request.request_id;
let requested_ip = connect_request.ip;
let reply_to = connect_request.reply_to;
// TODO: ignoring reply_to_hops and reply_to_avg_mix_delays for now
// Check that the IP is available in the set of connected clients
let is_ip_taken = self.connected_clients.contains_key(&requested_ip);
// let is_ip_taken = connected_clients
// .lock()
// .unwrap()
// .contains_key(&requested_ip);
// Check that the nym address isn't already registered
let is_nym_address_taken = self
.connected_clients
// .lock()
// .unwrap()
.values()
.any(|client| client.nym_address == reply_to);
match (is_ip_taken, is_nym_address_taken) {
(true, true) => {
log::info!("Connecting an already connected client");
// Update the last activity time for the client
if let Some(client) = self.connected_clients.get_mut(&requested_ip) {
client.last_activity = std::time::Instant::now();
} else {
log::error!("Failed to update last activity time for client");
}
Ok(Some(IpPacketResponse::new_static_connect_success(
request_id, reply_to,
)))
}
(false, false) => {
log::info!("Connecting a new client");
self.connected_clients.insert(
requested_ip,
ConnectedClient {
nym_address: reply_to,
last_activity: std::time::Instant::now(),
},
);
connected_client_tx
.send(ConnectedClientEvent::Connect(requested_ip, reply_to))
.await
.unwrap();
Ok(Some(IpPacketResponse::new_static_connect_success(
request_id, reply_to,
)))
}
(true, false) => {
log::info!("Requested IP is not available");
Ok(Some(IpPacketResponse::new_static_connect_failure(
request_id,
reply_to,
StaticConnectFailureReason::RequestedIpAlreadyInUse,
)))
}
(false, true) => {
log::info!("Nym address is already registered");
Ok(Some(IpPacketResponse::new_static_connect_failure(
request_id,
reply_to,
StaticConnectFailureReason::RequestedNymAddressAlreadyInUse,
)))
}
}
}
async fn on_dynamic_connect_request(
&mut self,
connect_request: nym_ip_packet_requests::DynamicConnectRequest,
connected_client_tx: &tokio::sync::mpsc::Sender<ConnectedClientEvent>,
) -> Result<Option<IpPacketResponse>, IpPacketRouterError> {
log::info!(
"Received dynamic connect request from {sender_address}",
sender_address = connect_request.reply_to
);
let request_id = connect_request.request_id;
let reply_to = connect_request.reply_to;
// TODO: ignoring reply_to_hops and reply_to_avg_mix_delays for now
// Check if it's the same client connecting again, then we just reuse the same IP
// TODO: this is problematic. Until we sign connect requests this means you can spam people
// with return traffic
let existing_ip = self.connected_clients.iter().find_map(|(ip, client)| {
if client.nym_address == reply_to {
Some(*ip)
} else {
None
}
});
if let Some(existing_ip) = existing_ip {
log::info!("Found existing client for nym address");
// Update the last activity time for the client
if let Some(client) = self.connected_clients.get_mut(&existing_ip) {
client.last_activity = std::time::Instant::now();
} else {
log::error!("Failed to update last activity time for client");
}
return Ok(Some(IpPacketResponse::new_dynamic_connect_success(
request_id,
reply_to,
existing_ip,
)));
}
let Some(new_ip) = generate_new_ip::find_new_ip(&self.connected_clients) else {
log::info!("No available IP address");
return Ok(Some(IpPacketResponse::new_dynamic_connect_failure(
request_id,
reply_to,
DynamicConnectFailureReason::NoAvailableIp,
)));
};
self.connected_clients.insert(
new_ip,
ConnectedClient {
nym_address: reply_to,
last_activity: std::time::Instant::now(),
},
);
connected_client_tx
.send(ConnectedClientEvent::Connect(new_ip, reply_to))
.await
.unwrap();
Ok(Some(IpPacketResponse::new_dynamic_connect_success(
request_id, reply_to, new_ip,
)))
}
async fn on_data_request(
&mut self,
data_request: nym_ip_packet_requests::DataRequest,
) -> Result<Option<IpPacketResponse>, IpPacketRouterError> {
log::info!("Received data request");
// We don't forward packets that we are not able to parse. BUT, there might be a good
// reason to still forward them.
@@ -271,11 +371,20 @@ impl IpPacketRouter {
src_addr,
dst_addr,
dst,
} = parse_packet(&tagged_packet.packet)?;
} = parse_packet(&data_request.ip_packet)?;
let dst_str = dst.map_or(dst_addr.to_string(), |dst| dst.to_string());
log::info!("Received packet: {packet_type}: {src_addr} -> {dst_str}");
// Check if there is a connected client for this src_addr. If there is, update the last activity time
// for the client. If there isn't, drop the packet.
if let Some(client) = self.connected_clients.get_mut(&src_addr) {
client.last_activity = std::time::Instant::now();
} else {
log::info!("Dropping packet: no connected client for {src_addr}");
return Ok(None);
}
// Filter check
if let Some(dst) = dst {
if !self.request_filter.check_address(&dst).await {
@@ -288,16 +397,200 @@ impl IpPacketRouter {
log::warn!("Ignoring filter check for packet without port number! TODO!");
}
// TODO: set the tag correctly. Can we just reuse sender_tag?
let peer_tag = 0;
// TODO: consider changing from Vec<u8> to bytes::Bytes?
// TODO: consider just removing the tag
let tag = 0;
self.tun_task_tx
.try_send((peer_tag, tagged_packet.packet.into()))
.try_send((tag, data_request.ip_packet.into()))
.map_err(|err| IpPacketRouterError::FailedToSendPacketToTun { source: err })?;
Ok(None)
}
async fn on_reconstructed_message(
&mut self,
reconstructed: ReconstructedMessage,
connected_client_tx: &tokio::sync::mpsc::Sender<ConnectedClientEvent>,
) -> Result<Option<IpPacketResponse>, IpPacketRouterError> {
log::debug!(
"Received message with sender_tag: {:?}",
reconstructed.sender_tag
);
// Check version of request
if let Some(version) = reconstructed.message.first() {
// The idea is that in the future we can add logic here to parse older versions to stay
// backwards compatible.
if *version != nym_ip_packet_requests::CURRENT_VERSION {
log::warn!("Received packet with invalid version");
return Err(IpPacketRouterError::InvalidPacketVersion(*version));
}
}
let request = IpPacketRequest::from_reconstructed_message(&reconstructed)
.map_err(|err| IpPacketRouterError::FailedToDeserializeTaggedPacket { source: err })?;
match request.data {
IpPacketRequestData::StaticConnect(connect_request) => {
self.on_static_connect_request(connect_request, connected_client_tx)
.await
}
IpPacketRequestData::DynamicConnect(connect_request) => {
self.on_dynamic_connect_request(connect_request, connected_client_tx)
.await
}
IpPacketRequestData::Data(data_request) => self.on_data_request(data_request).await,
}
}
async fn run(mut self) -> Result<(), IpPacketRouterError> {
let mut task_client = self.task_handle.fork("main_loop");
let mut disconnect_timer = tokio::time::interval(DISCONNECT_TIMER_INTERVAL);
let mixnet_client_sender = self.mixnet_client.split_sender();
let mixnet_client_sender_clone = mixnet_client_sender.clone();
// let connected_clients = Arc::new(std::sync::Mutex::new(
// HashMap::<IpAddr, ConnectedClient>::new(),
// ));
// let connected_clients_clone = connected_clients.clone();
let tun_task_response_rx = self.tun_task_response_rx.take();
let (connected_client_tx, mut connected_client_rx) = tokio::sync::mpsc::channel(16);
// Spawn TUN listener
tokio::spawn(async move {
let mut connected_clients = HashMap::<IpAddr, ConnectedClient>::new();
let mut tun_task_response_rx = tun_task_response_rx.unwrap();
loop {
tokio::select! {
connected_client_event = connected_client_rx.recv() => {
match connected_client_event {
Some(ConnectedClientEvent::Connect(ip, nym_addr)) => {
log::trace!("Connect client: {ip}");
connected_clients.insert(ip, ConnectedClient {
nym_address: nym_addr,
last_activity: std::time::Instant::now(),
});
},
Some(ConnectedClientEvent::Disconnect(ip)) => {
log::trace!("Disconnect client: {ip}");
connected_clients.remove(&ip);
},
None => {},
}
},
packet = tun_task_response_rx.recv() => {
if let Some((_tag, packet)) = packet {
// TODO: skip full parsing since we only need dst_addr
// let Ok(ParsedPacket {
// packet_type: _,
// src_addr: _,
// dst_addr,
// dst: _,
// }) = parse_packet(&packet) else {
// log::warn!("Failed to parse packet");
// continue;
// };
let Some(dst_addr) = parse_dst_addr(&packet) else {
log::warn!("Failed to parse packet");
continue;
};
let recipient = connected_clients.get(&dst_addr).map(|c| c.nym_address);
if let Some(recipient) = recipient {
let lane = TransmissionLane::General;
let packet_type = None;
let response_packet = IpPacketResponse::new_ip_packet(packet.into()).to_bytes();
let Ok(response_packet) = response_packet else {
log::error!("Failed to serialize response packet");
continue;
};
let input_message = InputMessage::new_regular(recipient, response_packet, lane, packet_type);
if let Err(err) = mixnet_client_sender_clone.send(input_message).await {
log::error!("IpPacketRouter [main loop]: failed to send packet to mixnet: {err}");
};
} else {
log::error!("IpPacketRouter [main loop]: no nym-address recipient for packet");
}
} else {
log::trace!("IpPacketRouter [main loop]: stopping since channel closed");
break;
}
}
}
}
});
while !task_client.is_shutdown() {
tokio::select! {
_ = task_client.recv() => {
log::debug!("IpPacketRouter [main loop]: received shutdown");
},
_ = disconnect_timer.tick() => {
let now = std::time::Instant::now();
let inactive_clients: Vec<IpAddr> = self.connected_clients.iter()
.filter_map(|(ip, client)| {
if now.duration_since(client.last_activity) > CLIENT_INACTIVITY_TIMEOUT {
Some(*ip)
} else {
None
}
})
.collect();
for ip in inactive_clients {
log::info!("Disconnect inactive client: {ip}");
self.connected_clients.remove(&ip);
connected_client_tx.send(ConnectedClientEvent::Disconnect(ip)).await.unwrap();
}
},
msg = self.mixnet_client.next() => {
if let Some(msg) = msg {
match self.on_reconstructed_message(msg, &connected_client_tx).await {
Ok(Some(response)) => {
let Some(recipient) = response.recipient() else {
log::error!("IpPacketRouter [main loop]: failed to get recipient from response");
continue;
};
let response_packet = response.to_bytes();
let Ok(response_packet) = response_packet else {
log::error!("Failed to serialize response packet");
continue;
};
let lane = TransmissionLane::General;
let packet_type = None;
let input_message = InputMessage::new_regular(*recipient, response_packet, lane, packet_type);
if let Err(err) = mixnet_client_sender.send(input_message).await {
log::error!("IpPacketRouter [main loop]: failed to send packet to mixnet: {err}");
};
},
Ok(None) => {
continue;
},
Err(err) => {
log::error!("Error handling mixnet message: {err}");
}
};
} else {
log::trace!("IpPacketRouter [main loop]: stopping since channel closed");
break;
};
},
}
}
log::info!("IpPacketRouter: stopping");
Ok(())
}
}
enum ConnectedClientEvent {
Disconnect(IpAddr),
Connect(IpAddr, Recipient),
}
struct ParsedPacket<'a> {
packet_type: &'a str,
src_addr: IpAddr,
@@ -312,8 +605,8 @@ fn parse_packet(packet: &[u8]) -> Result<ParsedPacket, IpPacketRouterError> {
})?;
let (packet_type, dst_port) = match headers.transport {
Some(etherparse::TransportSlice::Udp(header)) => ("ipv4", Some(header.destination_port())),
Some(etherparse::TransportSlice::Tcp(header)) => ("ipv6", Some(header.destination_port())),
Some(etherparse::TransportSlice::Udp(header)) => ("udp", Some(header.destination_port())),
Some(etherparse::TransportSlice::Tcp(header)) => ("tcp", Some(header.destination_port())),
Some(etherparse::TransportSlice::Icmpv4(_)) => ("icmpv4", None),
Some(etherparse::TransportSlice::Icmpv6(_)) => ("icmpv6", None),
Some(etherparse::TransportSlice::Unknown(_)) => ("unknown", None),
@@ -393,3 +686,37 @@ async fn create_mixnet_client(
.await
.map_err(|err| IpPacketRouterError::FailedToConnectToMixnet { source: err })
}
// Copyright (c) 2019 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause
const IPV4_MIN_HEADER_SIZE: usize = 20;
const IPV4_DST_IP_OFF: usize = 16;
const IPV4_IP_SZ: usize = 4;
const IPV6_MIN_HEADER_SIZE: usize = 40;
const IPV6_DST_IP_OFF: usize = 24;
const IPV6_IP_SZ: usize = 16;
pub fn parse_dst_addr(packet: &[u8]) -> Option<IpAddr> {
if packet.is_empty() {
return None;
}
match packet[0] >> 4 {
4 if packet.len() >= IPV4_MIN_HEADER_SIZE => {
let addr_bytes: [u8; IPV4_IP_SZ] = packet
[IPV4_DST_IP_OFF..IPV4_DST_IP_OFF + IPV4_IP_SZ]
.try_into()
.unwrap();
Some(IpAddr::from(addr_bytes))
}
6 if packet.len() >= IPV6_MIN_HEADER_SIZE => {
let addr_bytes: [u8; IPV6_IP_SZ] = packet
[IPV6_DST_IP_OFF..IPV6_DST_IP_OFF + IPV6_IP_SZ]
.try_into()
.unwrap();
Some(IpAddr::from(addr_bytes))
}
_ => None,
}
}