Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 005b67ef0f | |||
| 2f98912778 |
@@ -68,10 +68,8 @@ jobs:
|
||||
working-directory: nym-connect/native/android
|
||||
env:
|
||||
ANDROID_SDK_ROOT: ${{ env.ANDROID_HOME }}
|
||||
SENTRY_AUTH_TOKEN: ${{ secrets.NYMS5_ANDROID_SENTRY_AUTH_TOKEN }}
|
||||
# build for arm64 and x86_64
|
||||
run: |
|
||||
echo "auth.token=$SENTRY_AUTH_TOKEN" | tee -a sentry.properties
|
||||
./gradlew :app:assembleArch64Debug
|
||||
./gradlew :app:assembleArch64Release
|
||||
|
||||
@@ -109,3 +107,4 @@ jobs:
|
||||
files: |
|
||||
apk/nyms5-arch64-debug.apk
|
||||
apk/nyms5-arch64-release.apk
|
||||
|
||||
|
||||
+1
-2
@@ -43,5 +43,4 @@ envs/qwerty.env
|
||||
.parcel-cache
|
||||
**/.DS_Store
|
||||
cpu-cycles/libcpucycles/build
|
||||
foxyfox.env
|
||||
gateway/deploy.sh
|
||||
foxyfox.env
|
||||
Generated
+255
-1074
File diff suppressed because it is too large
Load Diff
Generated
+3
-17
@@ -2240,7 +2240,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-bin-common"
|
||||
version = "0.6.0"
|
||||
version = "0.5.0"
|
||||
dependencies = [
|
||||
"atty",
|
||||
"clap",
|
||||
@@ -2255,7 +2255,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-client-core"
|
||||
version = "1.1.15"
|
||||
version = "1.1.14"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"base64 0.21.2",
|
||||
@@ -2300,7 +2300,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-client-wasm"
|
||||
version = "1.1.1"
|
||||
version = "1.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
@@ -2697,7 +2697,6 @@ dependencies = [
|
||||
"nym-pemstore",
|
||||
"nym-sphinx-addressing",
|
||||
"nym-sphinx-params",
|
||||
"nym-sphinx-routing",
|
||||
"nym-sphinx-types",
|
||||
"nym-topology",
|
||||
"rand 0.7.3",
|
||||
@@ -2724,7 +2723,6 @@ dependencies = [
|
||||
"nym-crypto",
|
||||
"nym-sphinx-addressing",
|
||||
"nym-sphinx-params",
|
||||
"nym-sphinx-routing",
|
||||
"nym-sphinx-types",
|
||||
"nym-topology",
|
||||
"rand 0.7.3",
|
||||
@@ -2755,7 +2753,6 @@ dependencies = [
|
||||
"nym-sphinx-chunking",
|
||||
"nym-sphinx-forwarding",
|
||||
"nym-sphinx-params",
|
||||
"nym-sphinx-routing",
|
||||
"nym-sphinx-types",
|
||||
"nym-topology",
|
||||
"rand 0.7.3",
|
||||
@@ -2887,7 +2884,6 @@ dependencies = [
|
||||
"nym-service-provider-directory-common",
|
||||
"nym-vesting-contract",
|
||||
"nym-vesting-contract-common",
|
||||
"openssl",
|
||||
"prost",
|
||||
"reqwest",
|
||||
"serde",
|
||||
@@ -2978,15 +2974,6 @@ version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
|
||||
|
||||
[[package]]
|
||||
name = "openssl-src"
|
||||
version = "111.26.0+1.1.1u"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "efc62c9f12b22b8f5208c23a7200a442b2e5999f8bdf80233852122b5a4f6f37"
|
||||
dependencies = [
|
||||
"cc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "openssl-sys"
|
||||
version = "0.9.87"
|
||||
@@ -2995,7 +2982,6 @@ checksum = "8e17f59264b2809d77ae94f0e1ebabc434773f370d6ca667bd223ea10e06cc7e"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
"openssl-src",
|
||||
"pkg-config",
|
||||
"vcpkg",
|
||||
]
|
||||
|
||||
@@ -116,11 +116,11 @@ async function testWithTester() {
|
||||
// B) first get topology directly from nym-api
|
||||
// const validator = 'https://qwerty-validator-api.qa.nymte.ch/api';
|
||||
// const topology = await current_network_topology(validator)
|
||||
// const nodeTester = await new NymNodeTester(topology, undefined, preferredGateway);
|
||||
// const nodeTester = await new NymNodeTester(topology, preferredGateway);
|
||||
//
|
||||
// C) use nym-api in the constructor (note: it does no filtering for 'good' nodes on other layers)
|
||||
// const validator = 'https://qwerty-validator-api.qa.nymte.ch/api';
|
||||
// const nodeTester = await NymNodeTester.new_with_api(validator, undefined, preferredGateway)
|
||||
// const nodeTester = await NymNodeTester.new_with_api(validator, preferredGateway)
|
||||
|
||||
// D, E, F) you also don't have to specify the gateway. if you don't, a random one (from your topology) will be used
|
||||
// const topology = dummyTopology()
|
||||
@@ -139,40 +139,6 @@ async function testWithTester() {
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
async function testerReconnection() {
|
||||
const validator = 'https://qwerty-validator-api.qa.nymte.ch/api';
|
||||
const nodeTester = await NymNodeTester.new_with_api(validator);
|
||||
|
||||
self.onmessage = async event => {
|
||||
if (event.data && event.data.kind) {
|
||||
switch (event.data.kind) {
|
||||
case 'TestPacket': {
|
||||
const {mixnodeIdentity} = event.data.args;
|
||||
console.log("starting node test...");
|
||||
|
||||
let result1 = await nodeTester.test_node(mixnodeIdentity);
|
||||
console.log("sleeping for 5s");
|
||||
await new Promise(r => setTimeout(r, 5000));
|
||||
await nodeTester.disconnect_from_gateway();
|
||||
|
||||
console.log("sleeping for 5s");
|
||||
await new Promise(r => setTimeout(r, 5000));
|
||||
|
||||
await nodeTester.reconnect_to_gateway();
|
||||
let result2 = await nodeTester.test_node(mixnodeIdentity);
|
||||
|
||||
printAndDisplayTestResult(result1)
|
||||
printAndDisplayTestResult(result2)
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
async function testWithNymClient() {
|
||||
@@ -364,17 +330,14 @@ async function main() {
|
||||
// sets up better stack traces in case of in-rust panics
|
||||
set_panic_hook();
|
||||
|
||||
// show reconnection capabilities
|
||||
// await testerReconnection()
|
||||
|
||||
// run test on simplified and dedicated tester:
|
||||
await testWithTester()
|
||||
// await testWithTester()
|
||||
|
||||
// hook-up the whole client for testing
|
||||
// await testWithNymClient()
|
||||
|
||||
// 'Normal' client setup (to send 'normal' messages)
|
||||
// await normalNymClientUsage()
|
||||
await normalNymClientUsage()
|
||||
}
|
||||
|
||||
// Let's get started!
|
||||
|
||||
@@ -4,9 +4,6 @@
|
||||
// due to expansion of #[wasm_bindgen] macro on NodeTestResult
|
||||
#![allow(clippy::drop_non_drop)]
|
||||
|
||||
use crate::error::WasmClientError;
|
||||
use crate::tester::LockedGatewayClient;
|
||||
use js_sys::Promise;
|
||||
use nym_node_tester_utils::processor::Received;
|
||||
use nym_node_tester_utils::receiver::ReceivedReceiver;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -15,7 +12,6 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard};
|
||||
use wasm_bindgen::prelude::*;
|
||||
use wasm_bindgen_futures::future_to_promise;
|
||||
use wasm_utils::{console_log, console_warn};
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -111,37 +107,3 @@ impl Drop for TestMarker {
|
||||
self.value.store(false, Ordering::SeqCst)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait GatewayReconnection {
|
||||
fn disconnect_from_gateway(&self) -> Promise;
|
||||
|
||||
fn reconnect_to_gateway(&self) -> Promise;
|
||||
}
|
||||
|
||||
impl GatewayReconnection for LockedGatewayClient {
|
||||
fn disconnect_from_gateway(&self) -> Promise {
|
||||
let this = self.clone();
|
||||
|
||||
future_to_promise(async move {
|
||||
let mut guard = this.lock().await;
|
||||
guard
|
||||
.disconnect()
|
||||
.await
|
||||
.map_err(|err| JsValue::from(WasmClientError::from(err)))?;
|
||||
Ok(JsValue::undefined())
|
||||
})
|
||||
}
|
||||
|
||||
fn reconnect_to_gateway(&self) -> Promise {
|
||||
let this = self.clone();
|
||||
|
||||
future_to_promise(async move {
|
||||
let mut guard = this.lock().await;
|
||||
guard
|
||||
.try_reconnect()
|
||||
.await
|
||||
.map_err(|err| JsValue::from(WasmClientError::from(err)))?;
|
||||
Ok(JsValue::undefined())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ use crate::helpers::{current_network_topology_async, setup_from_topology};
|
||||
use crate::storage::ClientStorage;
|
||||
use crate::tester::ephemeral_receiver::EphemeralTestReceiver;
|
||||
use crate::tester::helpers::{
|
||||
GatewayReconnection, NodeTestResult, ReceivedReceiverWrapper, TestMarker, WasmTestMessageExt,
|
||||
NodeTestResult, ReceivedReceiverWrapper, TestMarker, WasmTestMessageExt,
|
||||
};
|
||||
use crate::topology::WasmNymTopology;
|
||||
use futures::channel::mpsc;
|
||||
@@ -73,7 +73,6 @@ pub struct NymNodeTester {
|
||||
#[wasm_bindgen]
|
||||
pub struct NymNodeTesterBuilder {
|
||||
gateway: Option<IdentityKey>,
|
||||
id: Option<String>,
|
||||
|
||||
base_topology: NymTopology,
|
||||
|
||||
@@ -95,11 +94,9 @@ impl NymNodeTesterBuilder {
|
||||
#[wasm_bindgen(constructor)]
|
||||
pub fn new(
|
||||
base_topology: WasmNymTopology,
|
||||
id: Option<String>,
|
||||
gateway: Option<IdentityKey>,
|
||||
) -> NymNodeTesterBuilder {
|
||||
NymNodeTesterBuilder {
|
||||
id,
|
||||
gateway,
|
||||
base_topology: base_topology.into(),
|
||||
bandwidth_controller: None,
|
||||
@@ -108,20 +105,15 @@ impl NymNodeTesterBuilder {
|
||||
|
||||
async fn _new_with_api(
|
||||
api_url: String,
|
||||
id: Option<String>,
|
||||
gateway: Option<IdentityKey>,
|
||||
) -> Result<Self, WasmClientError> {
|
||||
let topology = current_network_topology_async(api_url).await?;
|
||||
Ok(NymNodeTesterBuilder::new(topology, id, gateway))
|
||||
Ok(NymNodeTesterBuilder::new(topology, gateway))
|
||||
}
|
||||
|
||||
pub fn new_with_api(
|
||||
api_url: String,
|
||||
id: Option<String>,
|
||||
gateway: Option<IdentityKey>,
|
||||
) -> Promise {
|
||||
pub fn new_with_api(gateway: Option<IdentityKey>, api_url: String) -> Promise {
|
||||
future_to_promise(async move {
|
||||
Self::_new_with_api(api_url, id, gateway)
|
||||
Self::_new_with_api(api_url, gateway)
|
||||
.await
|
||||
.into_promise_result()
|
||||
})
|
||||
@@ -141,13 +133,7 @@ impl NymNodeTesterBuilder {
|
||||
async fn _setup_client(mut self) -> Result<NymNodeTester, WasmClientError> {
|
||||
let task_manager = TaskManager::default();
|
||||
|
||||
let storage_id = if let Some(client_id) = &self.id {
|
||||
format!("{NODE_TESTER_ID}-{client_id}")
|
||||
} else {
|
||||
NODE_TESTER_ID.to_owned()
|
||||
};
|
||||
|
||||
let client_store = ClientStorage::new_async(&storage_id, None).await?;
|
||||
let client_store = ClientStorage::new_async(NODE_TESTER_ID, None).await?;
|
||||
|
||||
let init_details = self.gateway_info(&client_store).await?;
|
||||
let gateway_endpoint = init_details.gateway_details;
|
||||
@@ -248,46 +234,29 @@ async fn test_mixnode(
|
||||
impl NymNodeTester {
|
||||
#[wasm_bindgen(constructor)]
|
||||
#[allow(clippy::new_ret_no_self)]
|
||||
pub fn new(
|
||||
topology: WasmNymTopology,
|
||||
id: Option<String>,
|
||||
gateway: Option<IdentityKey>,
|
||||
) -> Promise {
|
||||
pub fn new(topology: WasmNymTopology, gateway: Option<IdentityKey>) -> Promise {
|
||||
console_log!("constructing node tester!");
|
||||
NymNodeTesterBuilder::new(topology, id, gateway).setup_client()
|
||||
NymNodeTesterBuilder::new(topology, gateway).setup_client()
|
||||
}
|
||||
|
||||
async fn _new_with_api(
|
||||
api_url: String,
|
||||
id: Option<String>,
|
||||
gateway: Option<IdentityKey>,
|
||||
) -> Result<Self, WasmClientError> {
|
||||
NymNodeTesterBuilder::_new_with_api(api_url, id, gateway)
|
||||
NymNodeTesterBuilder::_new_with_api(api_url, gateway)
|
||||
.await?
|
||||
._setup_client()
|
||||
.await
|
||||
}
|
||||
|
||||
pub fn new_with_api(
|
||||
api_url: String,
|
||||
id: Option<String>,
|
||||
gateway: Option<IdentityKey>,
|
||||
) -> Promise {
|
||||
pub fn new_with_api(api_url: String, gateway: Option<IdentityKey>) -> Promise {
|
||||
future_to_promise(async move {
|
||||
Self::_new_with_api(api_url, id, gateway)
|
||||
Self::_new_with_api(api_url, gateway)
|
||||
.await
|
||||
.into_promise_result()
|
||||
})
|
||||
}
|
||||
|
||||
pub fn disconnect_from_gateway(&self) -> Promise {
|
||||
self.gateway_client.disconnect_from_gateway()
|
||||
}
|
||||
|
||||
pub fn reconnect_to_gateway(&self) -> Promise {
|
||||
self.gateway_client.reconnect_to_gateway()
|
||||
}
|
||||
|
||||
fn prepare_test_packets(
|
||||
&self,
|
||||
mixnode_identity: String,
|
||||
|
||||
@@ -10,7 +10,7 @@ use log::*;
|
||||
use nym_sphinx::acknowledgements::AckKey;
|
||||
use nym_sphinx::addressing::clients::Recipient;
|
||||
use nym_sphinx::cover::generate_loop_cover_packet;
|
||||
use nym_sphinx::params::{PacketSize, PacketType};
|
||||
use nym_sphinx::params::PacketSize;
|
||||
use nym_sphinx::utils::sample_poisson_duration;
|
||||
use rand::{rngs::OsRng, CryptoRng, Rng};
|
||||
use std::pin::Pin;
|
||||
@@ -63,8 +63,6 @@ where
|
||||
|
||||
/// Optional secondary predefined packet size used for the loop cover messages.
|
||||
secondary_packet_size: Option<PacketSize>,
|
||||
|
||||
packet_type: PacketType,
|
||||
}
|
||||
|
||||
impl<R> Stream for LoopCoverTrafficStream<R>
|
||||
@@ -137,7 +135,6 @@ impl LoopCoverTrafficStream<OsRng> {
|
||||
topology_access,
|
||||
primary_packet_size: traffic_config.primary_packet_size,
|
||||
secondary_packet_size: traffic_config.secondary_packet_size,
|
||||
packet_type: traffic_config.packet_type,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -197,7 +194,7 @@ impl LoopCoverTrafficStream<OsRng> {
|
||||
self.average_ack_delay,
|
||||
self.cover_traffic.loop_cover_traffic_average_delay,
|
||||
cover_traffic_packet_size,
|
||||
self.packet_type,
|
||||
nym_sphinx::params::PacketType::Mix,
|
||||
)
|
||||
.expect("Somehow failed to generate a loop cover message with a valid topology");
|
||||
|
||||
|
||||
@@ -724,7 +724,7 @@ impl<C, St> GatewayClient<C, St> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn try_reconnect(&mut self) -> Result<(), GatewayClientError> {
|
||||
async fn try_reconnect(&mut self) -> Result<(), GatewayClientError> {
|
||||
if !self.connection.is_established() {
|
||||
self.establish_connection().await?;
|
||||
}
|
||||
@@ -738,12 +738,6 @@ impl<C, St> GatewayClient<C, St> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn disconnect(&mut self) -> Result<(), GatewayClientError> {
|
||||
self.recover_socket_connection().await?;
|
||||
self.connection = SocketState::NotConnected;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn authenticate_and_start(&mut self) -> Result<Arc<SharedKeys>, GatewayClientError>
|
||||
where
|
||||
C: DkgQueryClient + Send + Sync,
|
||||
|
||||
@@ -48,21 +48,17 @@ impl PacketRouter {
|
||||
// data he takes the SURB-ACK and first hop address.
|
||||
// currently SURB-ACKs are attached in EVERY packet, even cover, so this is always true
|
||||
let ack_overhead = PacketSize::AckPacket.size() + MAX_NODE_ADDRESS_UNPADDED_LEN;
|
||||
let outfox_ack_overhead =
|
||||
PacketSize::OutfoxAckPacket.size() + MAX_NODE_ADDRESS_UNPADDED_LEN;
|
||||
|
||||
for received_packet in unwrapped_packets {
|
||||
if received_packet.len() == PacketSize::AckPacket.plaintext_size()
|
||||
// we don't know the real size of the payload, it could be anything <= 48 bytes
|
||||
|| received_packet.len() <= PacketSize::OutfoxAckPacket.plaintext_size()
|
||||
|| received_packet.len() == PacketSize::OutfoxAckPacket.plaintext_size()
|
||||
{
|
||||
received_acks.push(received_packet);
|
||||
} else if received_packet.len()
|
||||
== PacketSize::RegularPacket.plaintext_size() - ack_overhead
|
||||
|| received_packet.len()
|
||||
== PacketSize::OutfoxRegularPacket.plaintext_size() - outfox_ack_overhead
|
||||
|| received_packet.len()
|
||||
== PacketSize::OutfoxRegularPacket.size() - outfox_ack_overhead
|
||||
== PacketSize::OutfoxRegularPacket.plaintext_size() - ack_overhead
|
||||
|| received_packet.len() == PacketSize::OutfoxRegularPacket.size() - 6
|
||||
{
|
||||
trace!("routing regular packet");
|
||||
received_messages.push(received_packet);
|
||||
|
||||
@@ -15,9 +15,10 @@ use std::net::SocketAddr;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::net::{TcpStream, UdpSocket};
|
||||
use tokio::time::sleep;
|
||||
use tokio_util::codec::Framed;
|
||||
use tokio_util::udp::UdpFramed;
|
||||
|
||||
pub struct Config {
|
||||
initial_reconnection_backoff: Duration,
|
||||
@@ -57,136 +58,98 @@ pub trait SendWithoutResponse {
|
||||
}
|
||||
|
||||
pub struct Client {
|
||||
conn_new: HashMap<NymNodeRoutingAddress, ConnectionSender>,
|
||||
conn_new: Option<mpsc::Sender<(FramedNymPacket, SocketAddr)>>,
|
||||
config: Config,
|
||||
}
|
||||
|
||||
struct ConnectionSender {
|
||||
channel: mpsc::Sender<FramedNymPacket>,
|
||||
current_reconnection_attempt: Arc<AtomicU32>,
|
||||
}
|
||||
|
||||
impl ConnectionSender {
|
||||
fn new(channel: mpsc::Sender<FramedNymPacket>) -> Self {
|
||||
ConnectionSender {
|
||||
channel,
|
||||
current_reconnection_attempt: Arc::new(AtomicU32::new(0)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn new(config: Config) -> Client {
|
||||
Client {
|
||||
conn_new: HashMap::new(),
|
||||
conn_new: None,
|
||||
config,
|
||||
}
|
||||
}
|
||||
|
||||
async fn manage_connection(
|
||||
address: SocketAddr,
|
||||
receiver: mpsc::Receiver<FramedNymPacket>,
|
||||
connection_timeout: Duration,
|
||||
current_reconnection: &AtomicU32,
|
||||
receiver: mpsc::Receiver<(FramedNymPacket, SocketAddr)>,
|
||||
) {
|
||||
let connection_fut = TcpStream::connect(address);
|
||||
|
||||
let conn = match tokio::time::timeout(connection_timeout, connection_fut).await {
|
||||
Ok(stream_res) => match stream_res {
|
||||
Ok(stream) => {
|
||||
debug!("Managed to establish connection to {}", address);
|
||||
// if we managed to connect, reset the reconnection count (whatever it might have been)
|
||||
current_reconnection.store(0, Ordering::Release);
|
||||
Framed::new(stream, NymCodec)
|
||||
}
|
||||
Err(err) => {
|
||||
debug!(
|
||||
"failed to establish connection to {} (err: {})",
|
||||
address, err
|
||||
);
|
||||
return;
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
debug!(
|
||||
"failed to connect to {} within {:?}",
|
||||
address, connection_timeout
|
||||
);
|
||||
|
||||
// we failed to connect - increase reconnection attempt
|
||||
current_reconnection.fetch_add(1, Ordering::SeqCst);
|
||||
let socket = match UdpSocket::bind("0.0.0.0:0").await {
|
||||
Ok(socket) => socket,
|
||||
Err(err) => {
|
||||
error!("Failed to bind to - {err}. Are you sure nothing else is running on the specified port and your user has sufficient permission to bind to the requested address?");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let framed_conn = UdpFramed::new(socket, NymCodec);
|
||||
// Take whatever the receiver channel produces and put it on the connection.
|
||||
// We could have as well used conn.send_all(receiver.map(Ok)), but considering we don't care
|
||||
// about neither receiver nor the connection, it doesn't matter which one gets consumed
|
||||
if let Err(err) = receiver.map(Ok).forward(conn).await {
|
||||
if let Err(err) = receiver.map(Ok).forward(framed_conn).await {
|
||||
warn!("Failed to forward packets to {} - {err}", address);
|
||||
}
|
||||
|
||||
debug!(
|
||||
"connection manager to {} is finished. Either the connection failed or mixnet client got dropped",
|
||||
address
|
||||
"connection manager is finished. Either the connection failed or mixnet client got dropped"
|
||||
);
|
||||
}
|
||||
|
||||
/// If we're trying to reconnect, determine how long we should wait.
|
||||
fn determine_backoff(&self, current_attempt: u32) -> Option<Duration> {
|
||||
if current_attempt == 0 {
|
||||
None
|
||||
} else {
|
||||
let exp = 2_u32.checked_pow(current_attempt);
|
||||
let backoff = exp
|
||||
.and_then(|exp| self.config.initial_reconnection_backoff.checked_mul(exp))
|
||||
.unwrap_or(self.config.maximum_reconnection_backoff);
|
||||
// fn determine_backoff(&self, current_attempt: u32) -> Option<Duration> {
|
||||
// if current_attempt == 0 {
|
||||
// None
|
||||
// } else {
|
||||
// let exp = 2_u32.checked_pow(current_attempt);
|
||||
// let backoff = exp
|
||||
// .and_then(|exp| self.config.initial_reconnection_backoff.checked_mul(exp))
|
||||
// .unwrap_or(self.config.maximum_reconnection_backoff);
|
||||
|
||||
Some(std::cmp::min(
|
||||
backoff,
|
||||
self.config.maximum_reconnection_backoff,
|
||||
))
|
||||
}
|
||||
}
|
||||
// Some(std::cmp::min(
|
||||
// backoff,
|
||||
// self.config.maximum_reconnection_backoff,
|
||||
// ))
|
||||
// }
|
||||
// }
|
||||
|
||||
fn make_connection(&mut self, address: NymNodeRoutingAddress, pending_packet: FramedNymPacket) {
|
||||
let (mut sender, receiver) = mpsc::channel(self.config.maximum_connection_buffer_size);
|
||||
|
||||
// this CAN'T fail because we just created the channel which has a non-zero capacity
|
||||
if self.config.maximum_connection_buffer_size > 0 {
|
||||
sender.try_send(pending_packet).unwrap();
|
||||
sender.try_send((pending_packet, address.into())).unwrap();
|
||||
}
|
||||
self.conn_new = Some(sender);
|
||||
|
||||
// if we already tried to connect to `address` before, grab the current attempt count
|
||||
let current_reconnection_attempt = if let Some(existing) = self.conn_new.get_mut(&address) {
|
||||
existing.channel = sender;
|
||||
Arc::clone(&existing.current_reconnection_attempt)
|
||||
} else {
|
||||
let new_entry = ConnectionSender::new(sender);
|
||||
let current_attempt = Arc::clone(&new_entry.current_reconnection_attempt);
|
||||
self.conn_new.insert(address, new_entry);
|
||||
current_attempt
|
||||
};
|
||||
// let current_reconnection_attempt = if let Some(existing) = self.conn_new.get_mut(&address) {
|
||||
// existing.channel = sender;
|
||||
// Arc::clone(&existing.current_reconnection_attempt)
|
||||
// } else {
|
||||
// let new_entry = ConnectionSender::new(sender);
|
||||
// let current_attempt = Arc::clone(&new_entry.current_reconnection_attempt);
|
||||
// self.conn_new.insert(address, new_entry);
|
||||
// current_attempt
|
||||
// };
|
||||
|
||||
// load the actual value.
|
||||
let reconnection_attempt = current_reconnection_attempt.load(Ordering::Acquire);
|
||||
let backoff = self.determine_backoff(reconnection_attempt);
|
||||
// let reconnection_attempt = current_reconnection_attempt.load(Ordering::Acquire);
|
||||
// let backoff = self.determine_backoff(reconnection_attempt);
|
||||
|
||||
// copy the value before moving into another task
|
||||
let initial_connection_timeout = self.config.initial_connection_timeout;
|
||||
// let initial_connection_timeout = self.config.initial_connection_timeout;
|
||||
|
||||
tokio::spawn(async move {
|
||||
// before executing the manager, wait for what was specified, if anything
|
||||
if let Some(backoff) = backoff {
|
||||
trace!("waiting for {:?} before attempting connection", backoff);
|
||||
sleep(backoff).await;
|
||||
}
|
||||
// if let Some(backoff) = backoff {
|
||||
// trace!("waiting for {:?} before attempting connection", backoff);
|
||||
// sleep(backoff).await;
|
||||
// }
|
||||
|
||||
Self::manage_connection(
|
||||
address.into(),
|
||||
receiver,
|
||||
initial_connection_timeout,
|
||||
¤t_reconnection_attempt,
|
||||
//initial_connection_timeout,
|
||||
//¤t_reconnection_attempt,
|
||||
)
|
||||
.await
|
||||
});
|
||||
@@ -204,48 +167,53 @@ impl SendWithoutResponse for Client {
|
||||
let framed_packet =
|
||||
FramedNymPacket::new(packet, packet_type, self.config.use_legacy_version);
|
||||
|
||||
if let Some(sender) = self.conn_new.get_mut(&address) {
|
||||
if let Err(err) = sender.channel.try_send(framed_packet) {
|
||||
if err.is_full() {
|
||||
debug!("Connection to {} seems to not be able to handle all the traffic - dropping the current packet", address);
|
||||
// it's not a 'big' error, but we did not manage to send the packet
|
||||
// if the queue is full, we can't really do anything but to drop the packet
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::WouldBlock,
|
||||
"connection queue is full",
|
||||
))
|
||||
} else if err.is_disconnected() {
|
||||
debug!(
|
||||
"Connection to {} seems to be dead. attempting to re-establish it...",
|
||||
address
|
||||
);
|
||||
// it's not a 'big' error, but we did not manage to send the packet, but queue
|
||||
// it up to send it as soon as the connection is re-established
|
||||
self.make_connection(address, err.into_inner());
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::ConnectionAborted,
|
||||
"reconnection attempt is in progress",
|
||||
))
|
||||
match &self.conn_new {
|
||||
Some(sender) => {
|
||||
if let Err(err) = sender.clone().try_send((framed_packet, address.into())) {
|
||||
if err.is_full() {
|
||||
debug!("Connection to {} seems to not be able to handle all the traffic - dropping the current packet", address);
|
||||
// it's not a 'big' error, but we did not manage to send the packet
|
||||
// if the queue is full, we can't really do anything but to drop the packet
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::WouldBlock,
|
||||
"connection queue is full",
|
||||
))
|
||||
} else if err.is_disconnected() {
|
||||
debug!(
|
||||
"Connection to {} seems to be dead. attempting to re-establish it...",
|
||||
address
|
||||
);
|
||||
// it's not a 'big' error, but we did not manage to send the packet, but queue
|
||||
// it up to send it as soon as the connection is re-established
|
||||
self.make_connection(address, err.into_inner().0);
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::ConnectionAborted,
|
||||
"reconnection attempt is in progress",
|
||||
))
|
||||
} else {
|
||||
// this can't really happen, but let's safe-guard against it in case something changes in futures library
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"unknown connection buffer error",
|
||||
))
|
||||
}
|
||||
} else {
|
||||
// this can't really happen, but let's safe-guard against it in case something changes in futures library
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"unknown connection buffer error",
|
||||
))
|
||||
debug!("Sending packet to {:?}", address);
|
||||
Ok(())
|
||||
}
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
} else {
|
||||
// there was never a connection to begin with
|
||||
debug!("establishing initial connection to {}", address);
|
||||
// it's not a 'big' error, but we did not manage to send the packet, but queue the packet
|
||||
// for sending for as soon as the connection is created
|
||||
self.make_connection(address, framed_packet);
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::NotConnected,
|
||||
"connection is in progress",
|
||||
))
|
||||
|
||||
None => {
|
||||
// there was never a connection to begin with
|
||||
debug!("establishing initial connection");
|
||||
// it's not a 'big' error, but we did not manage to send the packet, but queue the packet
|
||||
// for sending for as soon as the connection is created
|
||||
self.make_connection(address, framed_packet);
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::NotConnected,
|
||||
"connection is in progress",
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,6 +41,3 @@ nym-multisig-contract-common = { path = "../cosmwasm-smart-contracts/multisig-co
|
||||
nym-service-provider-directory-common = { path = "../cosmwasm-smart-contracts/service-provider-directory" }
|
||||
nym-name-service-common = { path = "../cosmwasm-smart-contracts/name-service" }
|
||||
nym-sphinx = { path = "../../common/nymsphinx" }
|
||||
|
||||
nym-pemstore = { path = "../../common/pemstore", version = "0.3.0" }
|
||||
nym-types = { path = "../../common/types" }
|
||||
|
||||
@@ -1,68 +0,0 @@
|
||||
use clap::{Args, Parser, Subcommand};
|
||||
use nym_bin_common::output_format::OutputFormat;
|
||||
use nym_crypto::asymmetric::identity;
|
||||
use nym_types::helpers::ConsoleSigningOutput;
|
||||
use nym_validator_client::nyxd::error::NyxdError;
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[derive(Debug, Args)]
|
||||
#[clap(args_conflicts_with_subcommands = true, subcommand_required = true)]
|
||||
pub struct MixnetOperatorsIdentityKey {
|
||||
#[clap(subcommand)]
|
||||
pub command: MixnetOperatorsIdentityKeyCommands,
|
||||
}
|
||||
|
||||
#[derive(Debug, Subcommand)]
|
||||
pub enum MixnetOperatorsIdentityKeyCommands {
|
||||
/// Register a name alias for a nym address
|
||||
Sign(SignArgs),
|
||||
}
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
pub struct SignArgs {
|
||||
/// Path to private identity key (example: private_identity_key.pem)
|
||||
#[clap(long)]
|
||||
private_key: PathBuf,
|
||||
|
||||
/// Base58 encoded message to sign
|
||||
#[clap(long)]
|
||||
base58_msg: String,
|
||||
|
||||
#[clap(short, long, default_value_t = OutputFormat::default())]
|
||||
output: OutputFormat,
|
||||
}
|
||||
|
||||
pub async fn sign(args: SignArgs) -> Result<(), NyxdError> {
|
||||
eprintln!(">>> loading: {}", args.private_key.display());
|
||||
let private_identity_key: identity::PrivateKey =
|
||||
nym_pemstore::load_key(args.private_key).expect("failed to load key");
|
||||
|
||||
print_signed_msg(&private_identity_key, &args.base58_msg, args.output);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn print_signed_msg(private_key: &identity::PrivateKey, raw_msg: &str, output: OutputFormat) {
|
||||
let trimmed = raw_msg.trim();
|
||||
eprintln!(">>> attempting to sign: {trimmed}");
|
||||
|
||||
let Ok(decoded) = bs58::decode(trimmed).into_vec() else {
|
||||
println!("failed to base58 decode the message, did you copy it correctly?");
|
||||
return;
|
||||
};
|
||||
|
||||
eprintln!(">>> decoding the message...");
|
||||
|
||||
// we don't really care about what particular information is embedded inside of it,
|
||||
// we just want to know if user correctly copied the string, i.e. whether it's a valid bs58 encoded json
|
||||
if serde_json::from_slice::<serde_json::Value>(&decoded).is_err() {
|
||||
println!("failed to parse the message after decoding, did you copy it correctly?");
|
||||
return;
|
||||
};
|
||||
|
||||
// if this is a valid json, it MUST be a valid string
|
||||
let decoded_string = String::from_utf8(decoded.clone()).unwrap();
|
||||
let signature = private_key.sign(&decoded).to_base58_string();
|
||||
|
||||
let sign_output = ConsoleSigningOutput::new(decoded_string, signature);
|
||||
println!("{}", output.format(&sign_output));
|
||||
}
|
||||
@@ -4,7 +4,6 @@
|
||||
use clap::{Args, Subcommand};
|
||||
|
||||
pub mod gateway;
|
||||
pub mod identity_key;
|
||||
pub mod mixnode;
|
||||
pub mod name;
|
||||
pub mod service;
|
||||
@@ -27,6 +26,4 @@ pub enum MixnetOperatorsCommands {
|
||||
ServiceProvider(service::MixnetOperatorsService),
|
||||
/// Manage your registered name
|
||||
Name(name::MixnetOperatorsName),
|
||||
/// Sign messages using your private identity key
|
||||
IdentityKey(identity_key::MixnetOperatorsIdentityKey),
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
use nym_sphinx_acknowledgements::surb_ack::SurbAckRecoveryError;
|
||||
use nym_sphinx_addressing::nodes::NymNodeRoutingAddressError;
|
||||
use nym_sphinx_types::{NymPacketError, OutfoxError, SphinxError};
|
||||
use nym_sphinx_types::{NymPacketError, SphinxError};
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
@@ -25,7 +25,4 @@ pub enum MixProcessingError {
|
||||
|
||||
#[error("the received packet was set to use the very old and very much deprecated 'VPN' mode")]
|
||||
ReceivedOldTypeVpnPacket,
|
||||
|
||||
#[error("failed to process received outfox packet: {0}")]
|
||||
OutfoxProcessingError(#[from] OutfoxError),
|
||||
}
|
||||
|
||||
@@ -20,14 +20,12 @@ use tracing::instrument;
|
||||
|
||||
type ForwardAck = MixPacket;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ProcessedFinalHop {
|
||||
pub destination: DestinationAddressBytes,
|
||||
pub forward_ack: Option<ForwardAck>,
|
||||
pub message: Vec<u8>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum MixProcessingResult {
|
||||
/// Contains unwrapped data that should first get delayed before being sent to next hop.
|
||||
ForwardHop(MixPacket, Option<SphinxDelay>),
|
||||
@@ -143,7 +141,7 @@ impl SphinxPacketProcessor {
|
||||
match SurbAck::try_recover_first_hop_packet(&ack_data, packet_type) {
|
||||
Ok((first_hop, packet)) => (first_hop, packet),
|
||||
Err(err) => {
|
||||
info!("Failed to recover first hop from ack data: {err}");
|
||||
debug!("Failed to recover first hop from ack data: {err}");
|
||||
return Err(err.into());
|
||||
}
|
||||
};
|
||||
@@ -207,7 +205,7 @@ impl SphinxPacketProcessor {
|
||||
if packet.is_final_hop() {
|
||||
self.process_final_hop(
|
||||
DestinationAddressBytes::from_bytes(next_address),
|
||||
packet.recover_plaintext()?.to_vec(),
|
||||
packet.recover_plaintext().to_vec(),
|
||||
packet_size,
|
||||
packet_type,
|
||||
)
|
||||
@@ -241,14 +239,7 @@ impl SphinxPacketProcessor {
|
||||
|
||||
// for forward packets, extract next hop and set delay (but do NOT delay here)
|
||||
// for final packets, extract SURBAck
|
||||
let final_processing_result =
|
||||
self.perform_final_processing(processed_packet, packet_size, packet_type);
|
||||
|
||||
if final_processing_result.is_err() {
|
||||
error!("{:?}", final_processing_result)
|
||||
}
|
||||
|
||||
final_processing_result
|
||||
self.perform_final_processing(processed_packet, packet_size, packet_type)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,12 +15,11 @@ thiserror = { workspace = true }
|
||||
zeroize = { workspace = true }
|
||||
|
||||
nym-crypto = { path = "../../crypto", features = ["symmetric", "rand"] }
|
||||
nym-pemstore = { path = "../../pemstore" }
|
||||
nym-sphinx-addressing = { path = "../addressing" }
|
||||
nym-sphinx-params = { path = "../params" }
|
||||
nym-sphinx-routing = { path = "../routing" }
|
||||
nym-sphinx-types = { path = "../types" }
|
||||
nym-pemstore = { path = "../../pemstore" }
|
||||
nym-topology = { path = "../../topology" }
|
||||
|
||||
[features]
|
||||
serde = ["serde_crate", "generic-array"]
|
||||
serde = ["serde_crate", "generic-array"]
|
||||
@@ -9,7 +9,7 @@ use nym_sphinx_addressing::nodes::{
|
||||
};
|
||||
use nym_sphinx_params::packet_sizes::PacketSize;
|
||||
use nym_sphinx_params::{PacketType, DEFAULT_NUM_MIX_HOPS};
|
||||
use nym_sphinx_types::delays::Delay;
|
||||
use nym_sphinx_types::delays::{self, Delay};
|
||||
use nym_sphinx_types::{NymPacket, NymPacketError, MIN_PACKET_SIZE};
|
||||
use nym_topology::{NymTopology, NymTopologyError};
|
||||
use rand::{CryptoRng, RngCore};
|
||||
@@ -51,7 +51,7 @@ impl SurbAck {
|
||||
{
|
||||
let route =
|
||||
topology.random_route_to_gateway(rng, DEFAULT_NUM_MIX_HOPS, recipient.gateway())?;
|
||||
let delays = nym_sphinx_routing::generate_hop_delays(average_delay, route.len());
|
||||
let delays = delays::generate_from_average_duration(route.len(), average_delay);
|
||||
let destination = recipient.as_sphinx_destination();
|
||||
|
||||
let surb_ack_payload = prepare_identifier(rng, ack_key, marshaled_fragment_id);
|
||||
|
||||
@@ -16,7 +16,6 @@ thiserror = "1"
|
||||
nym-crypto = { path = "../../crypto", features = ["symmetric", "rand"] }
|
||||
nym-sphinx-addressing = { path = "../addressing" }
|
||||
nym-sphinx-params = { path = "../params" }
|
||||
nym-sphinx-routing = { path = "../routing" }
|
||||
nym-sphinx-types = { path = "../types" }
|
||||
nym-topology = { path = "../../topology" }
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ use nym_sphinx_addressing::clients::Recipient;
|
||||
use nym_sphinx_addressing::nodes::{NymNodeRoutingAddress, MAX_NODE_ADDRESS_UNPADDED_LEN};
|
||||
use nym_sphinx_params::packet_sizes::PacketSize;
|
||||
use nym_sphinx_params::{PacketType, ReplySurbKeyDigestAlgorithm, DEFAULT_NUM_MIX_HOPS};
|
||||
use nym_sphinx_types::{NymPacket, SURBMaterial, SphinxError, SURB};
|
||||
use nym_sphinx_types::{delays, NymPacket, SURBMaterial, SphinxError, SURB};
|
||||
use nym_topology::{NymTopology, NymTopologyError};
|
||||
use rand::{CryptoRng, RngCore};
|
||||
use serde::de::{Error as SerdeError, Visitor};
|
||||
@@ -96,7 +96,7 @@ impl ReplySurb {
|
||||
{
|
||||
let route =
|
||||
topology.random_route_to_gateway(rng, DEFAULT_NUM_MIX_HOPS, recipient.gateway())?;
|
||||
let delays = nym_sphinx_routing::generate_hop_delays(average_delay, route.len());
|
||||
let delays = delays::generate_from_average_duration(route.len(), average_delay);
|
||||
let destination = recipient.as_sphinx_destination();
|
||||
|
||||
let surb_material = SURBMaterial::new(route, delays, destination);
|
||||
|
||||
@@ -15,8 +15,7 @@ nym-crypto = { path = "../../crypto" }
|
||||
nym-sphinx-acknowledgements = { path = "../acknowledgements" }
|
||||
nym-sphinx-addressing = { path = "../addressing" }
|
||||
nym-sphinx-chunking = { path = "../chunking" }
|
||||
nym-sphinx-forwarding = { path = "../forwarding" }
|
||||
nym-sphinx-params = { path = "../params" }
|
||||
nym-sphinx-routing = { path = "../routing" }
|
||||
nym-sphinx-forwarding = { path = "../forwarding" }
|
||||
nym-sphinx-types = { path = "../types" }
|
||||
nym-topology = { path = "../../topology" }
|
||||
|
||||
@@ -13,7 +13,7 @@ use nym_sphinx_params::packet_sizes::PacketSize;
|
||||
use nym_sphinx_params::{
|
||||
PacketEncryptionAlgorithm, PacketHkdfAlgorithm, PacketType, DEFAULT_NUM_MIX_HOPS,
|
||||
};
|
||||
use nym_sphinx_types::NymPacket;
|
||||
use nym_sphinx_types::{delays, NymPacket};
|
||||
use nym_topology::{NymTopology, NymTopologyError};
|
||||
use rand::{CryptoRng, RngCore};
|
||||
use std::convert::TryFrom;
|
||||
@@ -91,7 +91,6 @@ where
|
||||
>(rng, full_address.encryption_key());
|
||||
|
||||
let public_key_bytes = ephemeral_keypair.public_key().to_bytes();
|
||||
|
||||
let cover_size = packet_size.plaintext_size() - public_key_bytes.len() - ack_bytes.len();
|
||||
|
||||
let mut cover_content: Vec<_> = LOOP_COVER_MESSAGE_PAYLOAD
|
||||
@@ -120,38 +119,22 @@ where
|
||||
|
||||
let route =
|
||||
topology.random_route_to_gateway(rng, DEFAULT_NUM_MIX_HOPS, full_address.gateway())?;
|
||||
let delays = nym_sphinx_routing::generate_hop_delays(average_packet_delay, route.len());
|
||||
let delays = delays::generate_from_average_duration(route.len(), average_packet_delay);
|
||||
let destination = full_address.as_sphinx_destination();
|
||||
|
||||
// once merged, that's an easy rng injection point for sphinx packets : )
|
||||
let packet = NymPacket::sphinx_build(
|
||||
packet_size.payload_size(),
|
||||
packet_payload,
|
||||
&route,
|
||||
&destination,
|
||||
&delays,
|
||||
)?;
|
||||
|
||||
let first_hop_address =
|
||||
NymNodeRoutingAddress::try_from(route.first().unwrap().address).unwrap();
|
||||
|
||||
// once merged, that's an easy rng injection point for sphinx packets : )
|
||||
let packet = match packet_type {
|
||||
PacketType::Mix => NymPacket::sphinx_build(
|
||||
packet_size.payload_size(),
|
||||
packet_payload,
|
||||
&route,
|
||||
&destination,
|
||||
&delays,
|
||||
)?,
|
||||
#[allow(deprecated)]
|
||||
PacketType::Vpn => NymPacket::sphinx_build(
|
||||
packet_size.payload_size(),
|
||||
packet_payload,
|
||||
&route,
|
||||
&destination,
|
||||
&delays,
|
||||
)?,
|
||||
PacketType::Outfox => NymPacket::outfox_build(
|
||||
packet_payload,
|
||||
&route,
|
||||
&destination,
|
||||
Some(packet_size.plaintext_size()),
|
||||
)?,
|
||||
};
|
||||
|
||||
Ok(MixPacket::new(first_hop_address, packet, packet_type))
|
||||
Ok(MixPacket::new(first_hop_address, packet, PacketType::Mix))
|
||||
}
|
||||
|
||||
/// Helper function used to determine if given message represents a loop cover message.
|
||||
|
||||
@@ -26,7 +26,7 @@ const ACK_IV_SIZE: usize = 16;
|
||||
|
||||
const ACK_PACKET_SIZE: usize = ACK_IV_SIZE + FRAG_ID_LEN + SPHINX_PACKET_OVERHEAD;
|
||||
const REGULAR_PACKET_SIZE: usize = 2 * 1024 + SPHINX_PACKET_OVERHEAD;
|
||||
const EXTENDED_PACKET_SIZE_8: usize = 8 * 1024 + SPHINX_PACKET_OVERHEAD;
|
||||
const EXTENDED_PACKET_SIZE_8: usize = 1 * 512 + SPHINX_PACKET_OVERHEAD;
|
||||
const EXTENDED_PACKET_SIZE_16: usize = 16 * 1024 + SPHINX_PACKET_OVERHEAD;
|
||||
const EXTENDED_PACKET_SIZE_32: usize = 32 * 1024 + SPHINX_PACKET_OVERHEAD;
|
||||
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use nym_sphinx_addressing::clients::Recipient;
|
||||
use nym_sphinx_types::{delays, Delay, Node};
|
||||
use nym_sphinx_types::Node;
|
||||
use thiserror::Error;
|
||||
|
||||
pub trait SphinxRouteMaker {
|
||||
@@ -43,11 +41,3 @@ impl SphinxRouteMaker for Vec<Node> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn generate_hop_delays(average_packet_delay: Duration, num_hops: usize) -> Vec<Delay> {
|
||||
if average_packet_delay.is_zero() {
|
||||
vec![nym_sphinx_types::Delay::new_from_millis(0); num_hops]
|
||||
} else {
|
||||
delays::generate_from_average_duration(num_hops, average_packet_delay)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@ use nym_sphinx_chunking::fragment::{Fragment, FragmentIdentifier};
|
||||
use nym_sphinx_forwarding::packet::MixPacket;
|
||||
use nym_sphinx_params::packet_sizes::PacketSize;
|
||||
use nym_sphinx_params::{PacketType, ReplySurbKeyDigestAlgorithm, DEFAULT_NUM_MIX_HOPS};
|
||||
use nym_sphinx_types::{Delay, NymPacket};
|
||||
use nym_sphinx_types::{delays, Delay, NymPacket};
|
||||
use nym_topology::{NymTopology, NymTopologyError};
|
||||
use rand::{CryptoRng, Rng};
|
||||
use std::convert::TryFrom;
|
||||
@@ -233,7 +233,7 @@ pub trait FragmentPreparer {
|
||||
|
||||
// including set of delays
|
||||
let delays =
|
||||
nym_sphinx_routing::generate_hop_delays(self.average_packet_delay(), route.len());
|
||||
delays::generate_from_average_duration(route.len(), self.average_packet_delay());
|
||||
|
||||
// create the actual sphinx packet here. With valid route and correct payload size,
|
||||
// there's absolutely no reason for this call to fail.
|
||||
|
||||
@@ -177,29 +177,20 @@ impl TaskManager {
|
||||
drop(notify_rx);
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
let interrupt_future = tokio::signal::ctrl_c();
|
||||
|
||||
// in wasm we'll never get our shutdown anyway...
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
let interrupt_future = futures::future::pending::<()>();
|
||||
futures::future::pending::<()>().await;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
let wait_future = tokio::time::sleep(Duration::from_secs(self.shutdown_timer_secs));
|
||||
|
||||
// TODO: we should be using a `Delay` here for wasm
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
let wait_future = futures::future::pending::<()>();
|
||||
|
||||
tokio::select! {
|
||||
_ = self.notify_tx.closed() => {
|
||||
log::info!("All registered tasks succesfully shutdown");
|
||||
},
|
||||
_ = interrupt_future => {
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
log::info!("Forcing shutdown");
|
||||
}
|
||||
_ = wait_future => {
|
||||
log::info!("Timeout reached, forcing shutdown");
|
||||
_ = tokio::time::sleep(Duration::from_secs(self.shutdown_timer_secs)) => {
|
||||
log::info!("Timout reached, forcing shutdown");
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
+3
-30
@@ -34,41 +34,16 @@ pretty_env_logger = "0.4"
|
||||
rand = "0.7"
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
sqlx = { version = "0.5", features = [
|
||||
"runtime-tokio-rustls",
|
||||
"sqlite",
|
||||
"macros",
|
||||
"migrate",
|
||||
] }
|
||||
sqlx = { version = "0.5", features = [ "runtime-tokio-rustls", "sqlite", "macros", "migrate", ] }
|
||||
subtle-encoding = { version = "0.5", features = ["bech32-preview"] }
|
||||
thiserror = "1"
|
||||
tokio = { version = "1.24.1", features = [
|
||||
"rt-multi-thread",
|
||||
"net",
|
||||
"signal",
|
||||
"fs",
|
||||
] }
|
||||
tokio = { version = "1.24.1", features = [ "rt-multi-thread", "net", "signal", "fs", ] }
|
||||
tokio-stream = { version = "0.1.11", features = ["fs"] }
|
||||
tokio-tungstenite = "0.14"
|
||||
tokio-util = { version = "0.7.4", features = ["codec"] }
|
||||
url = { version = "2.2", features = ["serde"] }
|
||||
zeroize = { workspace = true }
|
||||
|
||||
# wireguard
|
||||
# Forked it to be able to bump x25519-dalek to rc.3
|
||||
boringtun = { git = "https://github.com/durch/boringtun.git" }
|
||||
base64 = "0.21"
|
||||
x25519-dalek = { version = "=2.0.0-rc.3", features = [
|
||||
"reusable_secrets",
|
||||
"static_secrets",
|
||||
] }
|
||||
etherparse = "0.13.0"
|
||||
pnet = "0.34.0"
|
||||
bytes = "1.4.0"
|
||||
async-recursion = "1.0.4"
|
||||
smoltcp = "0.10.0"
|
||||
tun-tap = "0.1.3"
|
||||
|
||||
# internal
|
||||
nym-api-requests = { path = "../nym-api/nym-api-requests" }
|
||||
nym-bin-common = { path = "../common/bin-common", features = ["output_format"] }
|
||||
@@ -85,9 +60,7 @@ nym-sphinx = { path = "../common/nymsphinx" }
|
||||
nym-statistics-common = { path = "../common/statistics" }
|
||||
nym-task = { path = "../common/task" }
|
||||
nym-types = { path = "../common/types" }
|
||||
nym-validator-client = { path = "../common/client-libs/validator-client", features = [
|
||||
"nyxd-client",
|
||||
] }
|
||||
nym-validator-client = { path = "../common/client-libs/validator-client", features = [ "nyxd-client" ] }
|
||||
|
||||
[build-dependencies]
|
||||
tokio = { version = "1.24.1", features = ["rt-multi-thread", "macros"] }
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
gA3NCDl+xOorR3heFVB47FlGunsZgS4RDX2M0IY73lc=
|
||||
@@ -1 +0,0 @@
|
||||
mxV/mw7WZTe+0Msa0kvJHMHERDA/cSskiZWQce+TdEs=
|
||||
@@ -1 +0,0 @@
|
||||
AEqXrLFT4qjYq3wmX0456iv94uM6nDj5ugp6Jedcflg=
|
||||
@@ -1 +0,0 @@
|
||||
WM8s8bYegwMa0TJ+xIwhk+dImk2IpDUKslDBCZPizlE=
|
||||
@@ -155,7 +155,7 @@ impl<St: Storage> ConnectionHandler<St> {
|
||||
self.forward_ack(forward_ack, client_address);
|
||||
}
|
||||
|
||||
async fn handle_received_packet(&mut self, framed_sphinx_packet: FramedNymPacket) {
|
||||
pub(crate) async fn handle_received_packet(&mut self, framed_sphinx_packet: FramedNymPacket) {
|
||||
//
|
||||
// TODO: here be replay attack detection - it will require similar key cache to the one in
|
||||
// packet processor for vpn packets,
|
||||
@@ -174,47 +174,47 @@ impl<St: Storage> ConnectionHandler<St> {
|
||||
self.handle_processed_packet(processed_final_hop).await
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_connection(
|
||||
mut self,
|
||||
conn: TcpStream,
|
||||
remote: SocketAddr,
|
||||
mut shutdown: TaskClient,
|
||||
) {
|
||||
debug!("Starting connection handler for {:?}", remote);
|
||||
shutdown.mark_as_success();
|
||||
let mut framed_conn = Framed::new(conn, NymCodec);
|
||||
while !shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = shutdown.recv() => {
|
||||
log::trace!("ConnectionHandler: received shutdown");
|
||||
}
|
||||
framed_sphinx_packet = framed_conn.next() => {
|
||||
match framed_sphinx_packet {
|
||||
Some(Ok(framed_sphinx_packet)) => {
|
||||
// TODO: benchmark spawning tokio task with full processing vs just processing it
|
||||
// synchronously under higher load in single and multi-threaded situation.
|
||||
// pub(crate) async fn handle_connection(
|
||||
// mut self,
|
||||
// conn: TcpStream,
|
||||
// remote: SocketAddr,
|
||||
// mut shutdown: TaskClient,
|
||||
// ) {
|
||||
// debug!("Starting connection handler for {:?}", remote);
|
||||
// shutdown.mark_as_success();
|
||||
// let mut framed_conn = Framed::new(conn, NymCodec);
|
||||
// while !shutdown.is_shutdown() {
|
||||
// tokio::select! {
|
||||
// biased;
|
||||
// _ = shutdown.recv() => {
|
||||
// log::trace!("ConnectionHandler: received shutdown");
|
||||
// }
|
||||
// framed_sphinx_packet = framed_conn.next() => {
|
||||
// match framed_sphinx_packet {
|
||||
// Some(Ok(framed_sphinx_packet)) => {
|
||||
// // TODO: benchmark spawning tokio task with full processing vs just processing it
|
||||
// // synchronously under higher load in single and multi-threaded situation.
|
||||
|
||||
// in theory we could process multiple sphinx packet from the same connection in parallel,
|
||||
// but we already handle multiple concurrent connections so if anything, making
|
||||
// that change would only slow things down
|
||||
self.handle_received_packet(framed_sphinx_packet).await;
|
||||
}
|
||||
Some(Err(err)) => {
|
||||
error!(
|
||||
"The socket connection got corrupted with error: {err}. Closing the socket",
|
||||
);
|
||||
return;
|
||||
}
|
||||
None => break, // stream got closed by remote
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// // in theory we could process multiple sphinx packet from the same connection in parallel,
|
||||
// // but we already handle multiple concurrent connections so if anything, making
|
||||
// // that change would only slow things down
|
||||
// self.handle_received_packet(framed_sphinx_packet).await;
|
||||
// }
|
||||
// Some(Err(err)) => {
|
||||
// error!(
|
||||
// "The socket connection got corrupted with error: {err}. Closing the socket",
|
||||
// );
|
||||
// return;
|
||||
// }
|
||||
// None => break, // stream got closed by remote
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
info!(
|
||||
"Closing connection from {:?}",
|
||||
framed_conn.into_inner().peer_addr()
|
||||
);
|
||||
}
|
||||
// info!(
|
||||
// "Closing connection from {:?}",
|
||||
// framed_conn.into_inner().peer_addr()
|
||||
// );
|
||||
// }
|
||||
}
|
||||
|
||||
@@ -3,11 +3,14 @@
|
||||
|
||||
use crate::node::mixnet_handling::receiver::connection_handler::ConnectionHandler;
|
||||
use crate::node::storage::Storage;
|
||||
use futures::StreamExt;
|
||||
use log::*;
|
||||
use nym_sphinx::framing::codec::NymCodec;
|
||||
use nym_task::TaskClient;
|
||||
use std::net::SocketAddr;
|
||||
use std::process;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::udp::UdpFramed;
|
||||
|
||||
pub(crate) struct Listener {
|
||||
address: SocketAddr,
|
||||
@@ -20,12 +23,12 @@ impl Listener {
|
||||
Listener { address, shutdown }
|
||||
}
|
||||
|
||||
pub(crate) async fn run<St>(&mut self, connection_handler: ConnectionHandler<St>)
|
||||
pub(crate) async fn run<St>(&mut self, mut connection_handler: ConnectionHandler<St>)
|
||||
where
|
||||
St: Storage + Clone + 'static,
|
||||
{
|
||||
info!("Starting mixnet listener at {}", self.address);
|
||||
let tcp_listener = match tokio::net::TcpListener::bind(self.address).await {
|
||||
let socket = match tokio::net::UdpSocket::bind(self.address).await {
|
||||
Ok(listener) => listener,
|
||||
Err(err) => {
|
||||
error!("Failed to bind to {} - {err}. Are you sure nothing else is running on the specified port and your user has sufficient permission to bind to the requested address?", self.address);
|
||||
@@ -33,19 +36,33 @@ impl Listener {
|
||||
}
|
||||
};
|
||||
|
||||
let mut framed_conn = UdpFramed::new(socket, NymCodec);
|
||||
|
||||
while !self.shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = self.shutdown.recv() => {
|
||||
log::trace!("mixnet_handling::Listener: Received shutdown");
|
||||
}
|
||||
connection = tcp_listener.accept() => {
|
||||
match connection {
|
||||
Ok((socket, remote_addr)) => {
|
||||
let handler = connection_handler.clone();
|
||||
tokio::spawn(handler.handle_connection(socket, remote_addr, self.shutdown.clone()));
|
||||
framed_sphinx_packet = framed_conn.next() => {
|
||||
match framed_sphinx_packet {
|
||||
Some(Ok((framed_sphinx_packet, remote))) => {
|
||||
// TODO: benchmark spawning tokio task with full processing vs just processing it
|
||||
// synchronously under higher load in single and multi-threaded situation.
|
||||
|
||||
// in theory we could process multiple sphinx packet from the same connection in parallel,
|
||||
// but we already handle multiple concurrent connections so if anything, making
|
||||
// that change would only slow things down
|
||||
debug!("Handling packet from {remote:?}");
|
||||
connection_handler.handle_received_packet(framed_sphinx_packet).await;
|
||||
}
|
||||
Err(err) => warn!("failed to get client: {err}"),
|
||||
Some(Err(err)) => {
|
||||
error!(
|
||||
"The socket connection got corrupted with error: {err}. Closing the socket",
|
||||
);
|
||||
return;
|
||||
}
|
||||
None => break, // stream got closed by remote
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,6 @@ use crate::node::client_handling::websocket::connection_handler::coconut::Coconu
|
||||
use crate::node::mixnet_handling::receiver::connection_handler::ConnectionHandler;
|
||||
use crate::node::statistics::collector::GatewayStatisticsCollector;
|
||||
use crate::node::storage::Storage;
|
||||
use crate::node::wireguard::wireguard;
|
||||
use log::*;
|
||||
use nym_bin_common::output_format::OutputFormat;
|
||||
use nym_crypto::asymmetric::{encryption, identity};
|
||||
@@ -29,8 +28,6 @@ pub(crate) mod client_handling;
|
||||
pub(crate) mod mixnet_handling;
|
||||
pub(crate) mod statistics;
|
||||
pub(crate) mod storage;
|
||||
mod wg;
|
||||
pub(crate) mod wireguard;
|
||||
|
||||
/// Wire up and create Gateway instance
|
||||
pub(crate) async fn create_gateway(config: Config) -> Gateway<PersistentStorage> {
|
||||
@@ -300,8 +297,6 @@ impl<St> Gateway<St> {
|
||||
Arc::new(coconut_verifier),
|
||||
);
|
||||
|
||||
tokio::spawn(wireguard());
|
||||
|
||||
info!("Finished nym gateway startup procedure - it should now be able to receive mix and client traffic!");
|
||||
|
||||
self.wait_for_interrupt(shutdown).await
|
||||
|
||||
@@ -1,31 +0,0 @@
|
||||
use bytes::Bytes;
|
||||
use std::fmt::{Display, Formatter};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum Event {
|
||||
/// Dumb event with no data.
|
||||
Dumb,
|
||||
/// IP packet received from the WireGuard tunnel that should be passed through to the corresponding virtual device/internet.
|
||||
/// Original implementation also has protocol here since it understands it, but we'll have to infer it downstream
|
||||
WgPacket(Bytes),
|
||||
/// IP packet to be sent through the WireGuard tunnel as crafted by the virtual device.
|
||||
IpPacket(Bytes),
|
||||
}
|
||||
|
||||
impl Display for Event {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Event::Dumb => {
|
||||
write!(f, "Dumb{{}}")
|
||||
}
|
||||
Event::WgPacket(data) => {
|
||||
let size = data.len();
|
||||
write!(f, "WgPacket{{ size={} }}", size)
|
||||
}
|
||||
Event::IpPacket(data) => {
|
||||
let size = data.len();
|
||||
write!(f, "IpPacket{{ size={} }}", size)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,437 +0,0 @@
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_recursion::async_recursion;
|
||||
use base64::engine::general_purpose;
|
||||
use base64::Engine as _;
|
||||
use boringtun::noise::errors::WireGuardError;
|
||||
use boringtun::noise::{Tunn, TunnResult};
|
||||
use etherparse::{InternetSlice, PacketBuilder, SlicedPacket, TransportSlice};
|
||||
use log::{debug, info, warn};
|
||||
use pnet::packet::ip::IpNextHeaderProtocols;
|
||||
use pnet::packet::ipv4::{Ipv4Packet, MutableIpv4Packet};
|
||||
use pnet::packet::{MutablePacket, Packet, PacketSize};
|
||||
use pnet::transport::{ipv4_packet_iter, transport_channel};
|
||||
use tokio::net::UdpSocket;
|
||||
use tokio::sync::{Mutex, RwLock};
|
||||
use tokio::time::{sleep, timeout};
|
||||
use x25519_dalek::StaticSecret;
|
||||
|
||||
use crate::error;
|
||||
|
||||
use self::events::Event;
|
||||
|
||||
pub mod events;
|
||||
|
||||
const MAX_PACKET: usize = 65536;
|
||||
|
||||
/// A WireGuard tunnel. Encapsulates and decapsulates IP packets,
|
||||
/// recieves packets from the client on the udp_rx channel,
|
||||
/// and events from the internet on the eth_rx channel,
|
||||
/// sends data through udp socket or datalink sender directly.
|
||||
/// For now all tunnels recieve all events and filter on the source_peer_addr
|
||||
pub struct WireGuardTunnel {
|
||||
source_peer_addr: Arc<RwLock<Option<(Ipv4Addr, u16)>>>,
|
||||
/// `boringtun` peer/tunnel implementation, used for crypto & WG protocol.
|
||||
peer: Arc<Mutex<Tunn>>,
|
||||
udp: Arc<UdpSocket>,
|
||||
peer_endpoint: SocketAddr,
|
||||
bus_rx: tokio::sync::broadcast::Receiver<Event>,
|
||||
bus_tx: tokio::sync::broadcast::Sender<Event>,
|
||||
}
|
||||
|
||||
pub fn handle_l3_packet(data: &[u8], destination_addr: Ipv4Addr) -> Vec<u8> {
|
||||
let (mut tx, mut rx) = transport_channel(
|
||||
65535,
|
||||
pnet::transport::TransportChannelType::Layer3(IpNextHeaderProtocols::Tcp),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let mut rx_iterator = ipv4_packet_iter(&mut rx);
|
||||
|
||||
let mut must_send = true;
|
||||
let mut cnt = 0;
|
||||
while let Ok((packet, addr)) = rx_iterator.next() {
|
||||
if must_send {
|
||||
let data = data.to_vec();
|
||||
let incoming_packet = Ipv4Packet::new(&data).unwrap();
|
||||
let mut new_packet = vec![0; incoming_packet.packet_size()];
|
||||
let mut outgoing_packet = MutableIpv4Packet::new(&mut new_packet).unwrap();
|
||||
outgoing_packet.clone_from(&incoming_packet);
|
||||
info!(
|
||||
"Sending (ttl={}, proto={} from {} to {}({})",
|
||||
outgoing_packet.get_ttl(),
|
||||
outgoing_packet.get_next_level_protocol(),
|
||||
outgoing_packet.get_source(),
|
||||
outgoing_packet.get_destination(),
|
||||
destination_addr
|
||||
);
|
||||
outgoing_packet.set_source("95.217.227.118".parse().unwrap());
|
||||
let sent = tx
|
||||
.send_to(outgoing_packet, IpAddr::V4(destination_addr))
|
||||
.unwrap();
|
||||
info!("Sent L3 packet ({sent})");
|
||||
must_send = false;
|
||||
continue;
|
||||
}
|
||||
cnt += 1;
|
||||
let source = packet.get_source();
|
||||
let destination = packet.get_destination();
|
||||
info!("Ignoring packet from {source}");
|
||||
|
||||
if source == destination_addr {
|
||||
info!("({addr}){source} -> {destination}");
|
||||
return packet.payload().to_vec();
|
||||
}
|
||||
if cnt >= 10 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
vec![]
|
||||
}
|
||||
|
||||
impl WireGuardTunnel {
|
||||
async fn set_source_peer_addr(&self, source_addr: Ipv4Addr, source_port: Option<u16>) {
|
||||
{
|
||||
if self.source_peer_addr.read().await.is_some() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
let mut source_peer_addr = self.source_peer_addr.write().await;
|
||||
*source_peer_addr = Some((source_addr, source_port.unwrap_or(0)))
|
||||
}
|
||||
|
||||
pub async fn spin_off(mut self) {
|
||||
info!("Spun off WG tunnel");
|
||||
// We'll receive both inbound and outbound packages on the same channel, and filter on packet type
|
||||
loop {
|
||||
tokio::select! {
|
||||
packet = self.bus_rx.recv() => {
|
||||
match packet {
|
||||
Ok(p) => {
|
||||
info!("{p}");
|
||||
match p {
|
||||
Event::IpPacket(data) => self.consume_eth(&data).await,
|
||||
Event::WgPacket(data) => self.consume_wg(&data).await,
|
||||
_ => {}
|
||||
}
|
||||
},
|
||||
Err(e) => error!("{e}")
|
||||
}
|
||||
},
|
||||
_ = sleep(Duration::from_millis(5))=> {
|
||||
let mut send_buf = [0u8; MAX_PACKET];
|
||||
let tun_result = {
|
||||
let mut tun = timeout(Duration::from_millis(100), self.peer()).await.unwrap();
|
||||
tun.update_timers(&mut send_buf)
|
||||
};
|
||||
self.handle_routine_tun_result(tun_result).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn consume_eth(&self, data: &[u8]) {
|
||||
let parsed_packet = SlicedPacket::from_ethernet(data).unwrap();
|
||||
debug!("{parsed_packet:?}");
|
||||
let (source_addr, destination_addr) = match parsed_packet.ip.unwrap() {
|
||||
InternetSlice::Ipv4(ip, _) => (ip.source_addr(), ip.destination_addr()),
|
||||
_ => unimplemented!(),
|
||||
};
|
||||
let (source_port, destination_port, icmp_type) = match parsed_packet.transport.as_ref() {
|
||||
Some(TransportSlice::Tcp(tcp)) => {
|
||||
(Some(tcp.source_port()), Some(tcp.destination_port()), None)
|
||||
}
|
||||
Some(TransportSlice::Udp(udp)) => {
|
||||
(Some(udp.source_port()), Some(udp.destination_port()), None)
|
||||
}
|
||||
Some(TransportSlice::Icmpv4(icmp)) => (None, None, Some(icmp.icmp_type())),
|
||||
Some(TransportSlice::Icmpv6(_)) => panic!("ICMPv6"),
|
||||
Some(TransportSlice::Unknown(_)) => panic!("Unknown"),
|
||||
None => panic!("No transport layer"),
|
||||
};
|
||||
debug!(
|
||||
"{:?}:{:?} -> {:?}:{:?} - ({:?})",
|
||||
source_addr, source_port, destination_addr, destination_port, icmp_type
|
||||
);
|
||||
|
||||
if destination_addr == self.source_peer_addr.read().await.unwrap().0 {
|
||||
info!("Sending {} to {}", data.len(), self.peer_endpoint);
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
|
||||
let response_packet_builder =
|
||||
PacketBuilder::ipv4(source_addr.octets(), destination_addr.octets(), 64);
|
||||
|
||||
let mut response_packet =
|
||||
Vec::<u8>::with_capacity(response_packet_builder.size(parsed_packet.payload.len()));
|
||||
|
||||
match parsed_packet.transport.as_ref() {
|
||||
Some(TransportSlice::Udp(udp)) => {
|
||||
debug!("UDP: {}, {}", udp.length(), udp.destination_port());
|
||||
let response_packet_builder =
|
||||
response_packet_builder.udp(source_port.unwrap(), destination_port.unwrap());
|
||||
response_packet_builder
|
||||
.write(&mut response_packet, parsed_packet.payload)
|
||||
.unwrap();
|
||||
}
|
||||
Some(TransportSlice::Tcp(tcp)) => {
|
||||
let response_packet_builder = response_packet_builder.tcp(
|
||||
destination_port.unwrap(),
|
||||
source_port.unwrap(),
|
||||
tcp.sequence_number(),
|
||||
tcp.window_size(),
|
||||
);
|
||||
response_packet_builder
|
||||
.write(&mut response_packet, parsed_packet.payload)
|
||||
.unwrap();
|
||||
}
|
||||
Some(TransportSlice::Icmpv4(icmp)) => {
|
||||
info!("{:?}", icmp.icmp_type());
|
||||
let response_packet_builder = response_packet_builder.icmpv4(icmp.icmp_type());
|
||||
response_packet_builder
|
||||
.write(&mut response_packet, parsed_packet.payload)
|
||||
.unwrap();
|
||||
}
|
||||
None => {}
|
||||
_ => unimplemented!(),
|
||||
};
|
||||
|
||||
let encapsulated_response_packet = self.encapsulate_packet(&response_packet).await;
|
||||
// let packet = Tunn::parse_incoming_packet(&response_packet).unwrap();
|
||||
// info!("Sending {packet:?} to {addr}");
|
||||
let sent = self
|
||||
.udp
|
||||
.send_to(&encapsulated_response_packet, self.peer_endpoint)
|
||||
.await
|
||||
.unwrap();
|
||||
info!(
|
||||
"[{}:{} ({sent})-> {}:{}] -> {}",
|
||||
destination_addr,
|
||||
destination_port.unwrap_or(0),
|
||||
source_addr,
|
||||
source_port.unwrap_or(0),
|
||||
self.peer_endpoint
|
||||
);
|
||||
}
|
||||
|
||||
// TODO: extend to work with IPv6
|
||||
pub async fn produce_eth(&self, packet_bytes: &[u8]) -> Vec<u8> {
|
||||
let outgoing_packet = SlicedPacket::from_ip(packet_bytes).unwrap();
|
||||
let (source_addr, destination_addr) = match outgoing_packet.ip.unwrap() {
|
||||
InternetSlice::Ipv4(ip, _) => (ip.source_addr(), ip.destination_addr()),
|
||||
_ => unimplemented!(),
|
||||
};
|
||||
let (source_port, destination_port, icmp_type) = match outgoing_packet.transport.as_ref() {
|
||||
Some(TransportSlice::Tcp(tcp)) => {
|
||||
(Some(tcp.source_port()), Some(tcp.destination_port()), None)
|
||||
}
|
||||
Some(TransportSlice::Udp(udp)) => {
|
||||
(Some(udp.source_port()), Some(udp.destination_port()), None)
|
||||
}
|
||||
Some(TransportSlice::Icmpv4(icmp)) => (None, None, Some(icmp.icmp_type())),
|
||||
Some(TransportSlice::Icmpv6(_)) => panic!("ICMPv6"),
|
||||
Some(TransportSlice::Unknown(_)) => panic!("Unknown"),
|
||||
None => panic!("No transport layer"),
|
||||
};
|
||||
info!(
|
||||
"{:?}:{:?} -> {:?}:{:?} - ({:?})",
|
||||
source_addr, source_port, destination_addr, destination_port, icmp_type
|
||||
);
|
||||
self.set_source_peer_addr(source_addr, source_port).await;
|
||||
handle_l3_packet(packet_bytes, destination_addr)
|
||||
}
|
||||
|
||||
/// WireGuard consumption task. Receives encrypted packets from the WireGuard peer,
|
||||
/// decapsulates them, and dispatches newly received IP packets.
|
||||
async fn consume_wg(&self, data: &[u8]) {
|
||||
let mut send_buf = [0u8; MAX_PACKET];
|
||||
let mut peer = self.peer().await;
|
||||
match peer.decapsulate(None, data, &mut send_buf) {
|
||||
TunnResult::WriteToNetwork(packet) => {
|
||||
match self.udp.send_to(packet, self.peer_endpoint).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("Failed to send decapsulation-instructed packet to WireGuard endpoint: {:?}", e);
|
||||
}
|
||||
};
|
||||
loop {
|
||||
let mut send_buf = [0u8; MAX_PACKET];
|
||||
match peer.decapsulate(None, &[], &mut send_buf) {
|
||||
TunnResult::WriteToNetwork(packet) => {
|
||||
match self.udp.send_to(packet, self.peer_endpoint).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("Failed to send decapsulation-instructed packet to WireGuard endpoint: {:?}", e);
|
||||
break;
|
||||
}
|
||||
};
|
||||
}
|
||||
_ => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
TunnResult::WriteToTunnelV4(packet, _) | TunnResult::WriteToTunnelV6(packet, _) => {
|
||||
info!(
|
||||
"WireGuard endpoint sent IP packet of {} bytes",
|
||||
packet.len()
|
||||
);
|
||||
let response = self.produce_eth(packet).await;
|
||||
if !response.is_empty() {
|
||||
self.bus_tx.send(Event::IpPacket(response.into())).unwrap();
|
||||
}
|
||||
}
|
||||
x => warn!("{x:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
async fn encapsulate_packet(&self, payload: &[u8]) -> Vec<u8> {
|
||||
let len = 148.max(payload.len() + 32);
|
||||
let mut dst = vec![0; len];
|
||||
let mut t = self.peer().await;
|
||||
let packet = t.encapsulate(payload, &mut dst);
|
||||
match packet {
|
||||
TunnResult::WriteToNetwork(p) => p.to_vec(),
|
||||
unexpected => {
|
||||
error!("{:?}", unexpected);
|
||||
vec![]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn peer(&self) -> tokio::sync::MutexGuard<'_, Tunn> {
|
||||
self.peer.lock().await
|
||||
}
|
||||
|
||||
pub async fn new(
|
||||
peer_static_public: x25519_dalek::PublicKey,
|
||||
udp: Arc<UdpSocket>,
|
||||
peer_endpoint: SocketAddr,
|
||||
bus_tx: tokio::sync::broadcast::Sender<Event>,
|
||||
) -> Self {
|
||||
let peer = Arc::new(Mutex::new(Self::create_tunnel(peer_static_public)));
|
||||
|
||||
Self {
|
||||
source_peer_addr: Arc::new(RwLock::new(None)),
|
||||
peer,
|
||||
udp,
|
||||
peer_endpoint,
|
||||
bus_rx: bus_tx.subscribe(),
|
||||
bus_tx,
|
||||
}
|
||||
}
|
||||
|
||||
fn create_tunnel(peer_static_public: x25519_dalek::PublicKey) -> Tunn {
|
||||
let secret_bytes: [u8; 32] = general_purpose::STANDARD
|
||||
.decode("AEqXrLFT4qjYq3wmX0456iv94uM6nDj5ugp6Jedcflg=")
|
||||
.unwrap()
|
||||
.try_into()
|
||||
.unwrap();
|
||||
|
||||
let private_key = StaticSecret::try_from(secret_bytes).unwrap();
|
||||
|
||||
Tunn::new(private_key, peer_static_public, None, None, 0, None).unwrap()
|
||||
}
|
||||
|
||||
/// Encapsulates and sends an IP packet back to the WireGuard client.
|
||||
pub async fn send_ip_packet(&self, packet: &[u8]) -> anyhow::Result<()> {
|
||||
let mut send_buf = [0u8; MAX_PACKET];
|
||||
match self.peer().await.encapsulate(packet, &mut send_buf) {
|
||||
TunnResult::WriteToNetwork(packet) => {
|
||||
self.udp.send_to(packet, self.peer_endpoint).await.unwrap();
|
||||
debug!(
|
||||
"Sent {} bytes to WireGuard endpoint (encrypted IP packet)",
|
||||
packet.len()
|
||||
);
|
||||
}
|
||||
TunnResult::Err(e) => {
|
||||
error!("Failed to encapsulate IP packet: {:?}", e);
|
||||
}
|
||||
TunnResult::Done => {
|
||||
// Ignored
|
||||
}
|
||||
other => {
|
||||
error!(
|
||||
"Unexpected WireGuard state during encapsulation: {:?}",
|
||||
other
|
||||
);
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[async_recursion]
|
||||
async fn handle_routine_tun_result<'a: 'async_recursion>(&self, result: TunnResult<'a>) -> () {
|
||||
match result {
|
||||
TunnResult::WriteToNetwork(packet) => {
|
||||
info!(
|
||||
"Sending routine packet of {} bytes to WireGuard endpoint",
|
||||
packet.len()
|
||||
);
|
||||
match self.udp.send_to(packet, self.peer_endpoint).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Failed to send routine packet to WireGuard endpoint: {:?}",
|
||||
e
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
TunnResult::Err(WireGuardError::ConnectionExpired) => {
|
||||
warn!("Wireguard handshake has expired!");
|
||||
|
||||
let mut buf = vec![0u8; MAX_PACKET];
|
||||
let result = self
|
||||
.peer()
|
||||
.await
|
||||
.format_handshake_initiation(&mut buf[..], false);
|
||||
|
||||
self.handle_routine_tun_result(result).await
|
||||
}
|
||||
TunnResult::Err(e) => {
|
||||
error!(
|
||||
"Failed to prepare routine packet for WireGuard endpoint: {:?}",
|
||||
e
|
||||
);
|
||||
}
|
||||
TunnResult::Done => {
|
||||
// Sleep for a bit
|
||||
// tokio::time::sleep(Duration::from_millis(1)).await;
|
||||
}
|
||||
other => {
|
||||
warn!("Unexpected WireGuard routine task state: {:?}", other);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// fn route_protocol(&self, packet: &[u8]) -> Option<Protocol> {
|
||||
// match IpVersion::of_packet(packet) {
|
||||
// Ok(IpVersion::Ipv4) => Ipv4Packet::new_checked(&packet)
|
||||
// .ok()
|
||||
// // Only care if the packet is destined for this tunnel
|
||||
// .filter(|packet| Ipv4Addr::from(packet.dst_addr()) == self.source_peer_ip)
|
||||
// .and_then(|packet| match packet.next_header() {
|
||||
// IpProtocol::Tcp => Some(Protocol::Tcp),
|
||||
// IpProtocol::Udp => Some(Protocol::Udp),
|
||||
// // Unrecognized protocol, so we cannot determine where to route
|
||||
// _ => None,
|
||||
// }),
|
||||
// Ok(IpVersion::Ipv6) => Ipv6Packet::new_checked(&packet)
|
||||
// .ok()
|
||||
// // Only care if the packet is destined for this tunnel
|
||||
// .filter(|packet| Ipv6Addr::from(packet.dst_addr()) == self.source_peer_ip)
|
||||
// .and_then(|packet| match packet.next_header() {
|
||||
// IpProtocol::Tcp => Some(Protocol::Tcp),
|
||||
// IpProtocol::Udp => Some(Protocol::Udp),
|
||||
// // Unrecognized protocol, so we cannot determine where to route
|
||||
// _ => None,
|
||||
// }),
|
||||
// _ => None,
|
||||
// }
|
||||
// }
|
||||
}
|
||||
@@ -1,77 +0,0 @@
|
||||
use base64::engine::general_purpose;
|
||||
use base64::Engine as _;
|
||||
use log::{error, info};
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use tokio::net::UdpSocket;
|
||||
use tokio::sync::broadcast;
|
||||
use x25519_dalek::{PublicKey, StaticSecret};
|
||||
|
||||
use crate::node::wg::events::Event;
|
||||
use crate::node::wg::WireGuardTunnel;
|
||||
|
||||
pub async fn wireguard() {
|
||||
let wg_address = "0.0.0.0:51820";
|
||||
let sock = Arc::new(UdpSocket::bind(wg_address).await.unwrap());
|
||||
info!("wg listening on {wg_address}");
|
||||
|
||||
// Secret key ofthe gateway, we'll need a way to generate this from the IdentityKey, might be enough to do some base58 -> base64 conversion
|
||||
let secret_bytes: [u8; 32] = general_purpose::STANDARD
|
||||
.decode("AEqXrLFT4qjYq3wmX0456iv94uM6nDj5ugp6Jedcflg=")
|
||||
.unwrap()
|
||||
.try_into()
|
||||
.unwrap();
|
||||
|
||||
// Hardcoded peer public key, we'll need a way to register those, private key for that one is `aMUcuAgTiFCHQ/fHqEQRvpLWBxh8sKA7f7lSyWymrGE=`
|
||||
// Wireguard configuration that works with this setup is below, this needs to be put into the wireguard client of choice.
|
||||
// Working in this case means that they go through the handshake, and client
|
||||
// starts sending data packets to the gateway.
|
||||
//
|
||||
// [Interface]
|
||||
// PrivateKey = aMUcuAgTiFCHQ/fHqEQRvpLWBxh8sKA7f7lSyWymrGE=
|
||||
// Address = 10.8.0.0/24
|
||||
// DNS = 1.1.1.1
|
||||
//
|
||||
// [Peer]
|
||||
// PublicKey = y6/iGYraJjON6pw9fcBa5vLRbGsQqprFLfWKyJQnlWs=
|
||||
// AllowedIPs = 0.0.0.0/0
|
||||
// Endpoint = 127.0.0.1:51820
|
||||
let peer_public_bytes: [u8; 32] = general_purpose::STANDARD
|
||||
.decode("mxV/mw7WZTe+0Msa0kvJHMHERDA/cSskiZWQce+TdEs=")
|
||||
.unwrap()
|
||||
.try_into()
|
||||
.unwrap();
|
||||
let peer_public = PublicKey::from(peer_public_bytes);
|
||||
let secret = StaticSecret::try_from(secret_bytes).unwrap();
|
||||
let public = PublicKey::from(&secret);
|
||||
info!(
|
||||
"wg public key: {}",
|
||||
general_purpose::STANDARD.encode(public)
|
||||
);
|
||||
|
||||
let mut buf = [0; 1024];
|
||||
let mut peers = HashSet::new();
|
||||
|
||||
let (bus_tx, _) = broadcast::channel(128);
|
||||
|
||||
while let Ok((len, addr)) = sock.recv_from(&mut buf).await {
|
||||
info!("Received {} bytes from {}", len, addr);
|
||||
if peers.contains(&addr) {
|
||||
bus_tx
|
||||
.send(Event::WgPacket(buf[..len].to_vec().into()))
|
||||
.map_err(|e| error!("{e}"))
|
||||
.unwrap();
|
||||
} else {
|
||||
info!("New peer with endpoint {addr}");
|
||||
let tun =
|
||||
WireGuardTunnel::new(peer_public, Arc::clone(&sock), addr, bus_tx.clone()).await;
|
||||
peers.insert(addr);
|
||||
tokio::spawn(tun.spin_off());
|
||||
bus_tx
|
||||
.send(Event::WgPacket(buf[..len].to_vec().into()))
|
||||
.map_err(|e| error!("{e}"))
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
panic!("Not OK");
|
||||
}
|
||||
+1
-1
@@ -33,7 +33,7 @@ serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
sysinfo = "0.27.7"
|
||||
tokio = { version = "1.21.2", features = ["rt-multi-thread", "net", "signal"] }
|
||||
tokio-util = { version = "0.7.3", features = ["codec"] }
|
||||
tokio-util = { version = "0.7.3", features = ["codec", "net"] }
|
||||
toml = "0.5.8"
|
||||
url = { version = "2.2", features = ["serde"] }
|
||||
cfg-if = "1.0.0"
|
||||
|
||||
@@ -54,7 +54,7 @@ impl ConnectionHandler {
|
||||
feature = "cpucycles",
|
||||
instrument(skip(self, framed_sphinx_packet), fields(cpucycles))
|
||||
)]
|
||||
fn handle_received_packet(&self, framed_sphinx_packet: FramedNymPacket) {
|
||||
pub(crate) async fn handle_received_packet(&self, framed_sphinx_packet: FramedNymPacket) {
|
||||
//
|
||||
// TODO: here be replay attack detection - it will require similar key cache to the one in
|
||||
// packet processor for vpn packets,
|
||||
@@ -78,50 +78,50 @@ impl ConnectionHandler {
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_connection(
|
||||
self,
|
||||
conn: TcpStream,
|
||||
remote: SocketAddr,
|
||||
mut shutdown: TaskClient,
|
||||
) {
|
||||
debug!("Starting connection handler for {:?}", remote);
|
||||
shutdown.mark_as_success();
|
||||
let mut framed_conn = Framed::new(conn, NymCodec);
|
||||
while !shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = shutdown.recv() => {
|
||||
log::trace!("ConnectionHandler: received shutdown");
|
||||
}
|
||||
framed_sphinx_packet = framed_conn.next() => {
|
||||
match framed_sphinx_packet {
|
||||
Some(Ok(framed_sphinx_packet)) => {
|
||||
// TODO: benchmark spawning tokio task with full processing vs just processing it
|
||||
// synchronously (without delaying inside of course,
|
||||
// delay is moved to a global DelayQueue)
|
||||
// under higher load in single and multi-threaded situation.
|
||||
// pub(crate) async fn handle_connection(
|
||||
// self,
|
||||
// conn: TcpStream,
|
||||
// remote: SocketAddr,
|
||||
// mut shutdown: TaskClient,
|
||||
// ) {
|
||||
// debug!("Starting connection handler for {:?}", remote);
|
||||
// //shutdown.mark_as_success();
|
||||
// //let mut framed_conn = Framed::new(conn, NymCodec);
|
||||
// while !shutdown.is_shutdown() {
|
||||
// tokio::select! {
|
||||
// biased;
|
||||
// _ = shutdown.recv() => {
|
||||
// log::trace!("ConnectionHandler: received shutdown");
|
||||
// }
|
||||
// framed_sphinx_packet = framed_conn.next() => {
|
||||
// match framed_sphinx_packet {
|
||||
// Some(Ok(framed_sphinx_packet)) => {
|
||||
// // TODO: benchmark spawning tokio task with full processing vs just processing it
|
||||
// // synchronously (without delaying inside of course,
|
||||
// // delay is moved to a global DelayQueue)
|
||||
// // under higher load in single and multi-threaded situation.
|
||||
|
||||
// in theory we could process multiple sphinx packet from the same connection in parallel,
|
||||
// but we already handle multiple concurrent connections so if anything, making
|
||||
// that change would only slow things down
|
||||
self.handle_received_packet(framed_sphinx_packet);
|
||||
}
|
||||
Some(Err(err)) => {
|
||||
error!(
|
||||
"{remote:?} - The socket connection got corrupted with error: {err}. Closing the socket",
|
||||
);
|
||||
return;
|
||||
}
|
||||
None => break, // stream got closed by remote
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
// // in theory we could process multiple sphinx packet from the same connection in parallel,
|
||||
// // but we already handle multiple concurrent connections so if anything, making
|
||||
// // that change would only slow things down
|
||||
// self.handle_received_packet(framed_sphinx_packet);
|
||||
// }
|
||||
// Some(Err(err)) => {
|
||||
// error!(
|
||||
// "{remote:?} - The socket connection got corrupted with error: {err}. Closing the socket",
|
||||
// );
|
||||
// return;
|
||||
// }
|
||||
// None => break, // stream got closed by remote
|
||||
// }
|
||||
// },
|
||||
// }
|
||||
// }
|
||||
|
||||
info!(
|
||||
"Closing connection from {:?}",
|
||||
framed_conn.into_inner().peer_addr()
|
||||
);
|
||||
log::trace!("ConnectionHandler: Exiting");
|
||||
}
|
||||
// info!(
|
||||
// "Closing connection from {:?}",
|
||||
// framed_conn.into_inner().peer_addr()
|
||||
// );
|
||||
// log::trace!("ConnectionHandler: Exiting");
|
||||
// }
|
||||
}
|
||||
|
||||
@@ -2,10 +2,13 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::node::listener::connection_handler::ConnectionHandler;
|
||||
use futures::StreamExt;
|
||||
use nym_sphinx::framing::codec::NymCodec;
|
||||
use std::net::SocketAddr;
|
||||
use std::process;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::net::UdpSocket;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::udp::UdpFramed;
|
||||
#[cfg(feature = "cpucycles")]
|
||||
use tracing::error;
|
||||
|
||||
@@ -25,7 +28,7 @@ impl Listener {
|
||||
|
||||
async fn run(&mut self, connection_handler: ConnectionHandler) {
|
||||
log::trace!("Starting Listener");
|
||||
let listener = match TcpListener::bind(self.address).await {
|
||||
let socket = match UdpSocket::bind(self.address).await {
|
||||
Ok(listener) => listener,
|
||||
Err(err) => {
|
||||
error!("Failed to bind to {} - {err}. Are you sure nothing else is running on the specified port and your user has sufficient permission to bind to the requested address?", self.address);
|
||||
@@ -33,19 +36,36 @@ impl Listener {
|
||||
}
|
||||
};
|
||||
|
||||
let mut framed_conn = UdpFramed::new(socket, NymCodec);
|
||||
|
||||
while !self.shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = self.shutdown.recv() => {
|
||||
log::trace!("Listener: Received shutdown");
|
||||
}
|
||||
connection = listener.accept() => {
|
||||
match connection {
|
||||
Ok((socket, remote_addr)) => {
|
||||
let handler = connection_handler.clone();
|
||||
tokio::spawn(handler.handle_connection(socket, remote_addr, self.shutdown.clone()));
|
||||
},
|
||||
framed_sphinx_packet = framed_conn.next() => {
|
||||
match framed_sphinx_packet {
|
||||
Some(Ok((framed_sphinx_packet, remote))) => {
|
||||
// TODO: benchmark spawning tokio task with full processing vs just processing it
|
||||
// synchronously (without delaying inside of course,
|
||||
// delay is moved to a global DelayQueue)
|
||||
// under higher load in single and multi-threaded situation.
|
||||
|
||||
// in theory we could process multiple sphinx packet from the same connection in parallel,
|
||||
// but we already handle multiple concurrent connections so if anything, making
|
||||
// that change would only slow things down
|
||||
debug!("Handling packet from {remote:?}");
|
||||
let connection_handler_clone = connection_handler.clone();
|
||||
tokio::spawn(async move {connection_handler_clone.handle_received_packet(framed_sphinx_packet).await });
|
||||
}
|
||||
Err(err) => warn!("Failed to accept incoming connection - {err}"),
|
||||
Some(Err(err)) => {
|
||||
error!(
|
||||
"The socket connection got corrupted with error: {err}. Closing the socket",
|
||||
);
|
||||
return;
|
||||
}
|
||||
None => break, // stream got closed by remote
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
Generated
+2
-17
@@ -3220,7 +3220,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-bin-common"
|
||||
version = "0.6.0"
|
||||
version = "0.5.0"
|
||||
dependencies = [
|
||||
"atty",
|
||||
"clap",
|
||||
@@ -3235,7 +3235,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-client-core"
|
||||
version = "1.1.15"
|
||||
version = "1.1.14"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"base64 0.21.2",
|
||||
@@ -3365,7 +3365,6 @@ dependencies = [
|
||||
"nym-credential-storage",
|
||||
"nym-crypto",
|
||||
"nym-socks5-client-core",
|
||||
"nym-sphinx",
|
||||
"nym-task",
|
||||
"pretty_env_logger",
|
||||
"rand 0.8.5",
|
||||
@@ -3736,7 +3735,6 @@ dependencies = [
|
||||
"nym-pemstore",
|
||||
"nym-sphinx-addressing",
|
||||
"nym-sphinx-params",
|
||||
"nym-sphinx-routing",
|
||||
"nym-sphinx-types",
|
||||
"nym-topology",
|
||||
"rand 0.7.3",
|
||||
@@ -3762,7 +3760,6 @@ dependencies = [
|
||||
"nym-crypto",
|
||||
"nym-sphinx-addressing",
|
||||
"nym-sphinx-params",
|
||||
"nym-sphinx-routing",
|
||||
"nym-sphinx-types",
|
||||
"nym-topology",
|
||||
"rand 0.7.3",
|
||||
@@ -3793,7 +3790,6 @@ dependencies = [
|
||||
"nym-sphinx-chunking",
|
||||
"nym-sphinx-forwarding",
|
||||
"nym-sphinx-params",
|
||||
"nym-sphinx-routing",
|
||||
"nym-sphinx-types",
|
||||
"nym-topology",
|
||||
"rand 0.7.3",
|
||||
@@ -3910,7 +3906,6 @@ dependencies = [
|
||||
"nym-service-provider-directory-common",
|
||||
"nym-vesting-contract",
|
||||
"nym-vesting-contract-common",
|
||||
"openssl",
|
||||
"prost",
|
||||
"reqwest",
|
||||
"serde",
|
||||
@@ -4050,15 +4045,6 @@ version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
|
||||
|
||||
[[package]]
|
||||
name = "openssl-src"
|
||||
version = "111.26.0+1.1.1u"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "efc62c9f12b22b8f5208c23a7200a442b2e5999f8bdf80233852122b5a4f6f37"
|
||||
dependencies = [
|
||||
"cc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "openssl-sys"
|
||||
version = "0.9.82"
|
||||
@@ -4068,7 +4054,6 @@ dependencies = [
|
||||
"autocfg",
|
||||
"cc",
|
||||
"libc",
|
||||
"openssl-src",
|
||||
"pkg-config",
|
||||
"vcpkg",
|
||||
]
|
||||
|
||||
@@ -53,7 +53,6 @@ nym-crypto = { path = "../../../common/crypto" }
|
||||
nym-credential-storage = { path = "../../../common/credential-storage" }
|
||||
nym-bin-common = { path = "../../../common/bin-common"}
|
||||
nym-socks5-client-core = { path = "../../../common/socks5-client-core" }
|
||||
nym-sphinx = { path = "../../../common/nymsphinx" }
|
||||
nym-task = { path = "../../../common/task" }
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
@@ -160,11 +160,7 @@ pub async fn init_socks5_config(provider_address: String, chosen_gateway_id: Str
|
||||
config.core.base.client.nym_api_urls = nym_config_common::parse_urls(&raw_validators);
|
||||
}
|
||||
|
||||
let gateway_setup = if register_gateway {
|
||||
GatewaySetup::new_fresh(Some(chosen_gateway_id), None)
|
||||
} else {
|
||||
GatewaySetup::MustLoad
|
||||
};
|
||||
let gateway_setup = GatewaySetup::new_fresh(Some(chosen_gateway_id), None);
|
||||
|
||||
// Setup gateway by either registering a new one, or reusing exiting keys
|
||||
let key_store = OnDiskKeys::new(config.storage_paths.common_paths.keys.clone());
|
||||
|
||||
@@ -25,11 +25,6 @@ mod tasks;
|
||||
mod window;
|
||||
|
||||
fn main() {
|
||||
if std::env::var("NYM_CONNECT_ENABLE_MEDIUM").is_ok() {
|
||||
std::env::set_var("NYM_CONNECT_DISABLE_COVER", "1");
|
||||
std::env::set_var("NYM_CONNECT_ENABLE_MIXED_SIZE_PACKETS", "1");
|
||||
std::env::set_var("NYM_CONNECT_DISABLE_PER_HOP_DELAYS", "1");
|
||||
}
|
||||
setup_env(None);
|
||||
println!("Starting up...");
|
||||
|
||||
|
||||
@@ -4,10 +4,8 @@ use nym_client_core::client::base_client::storage::{MixnetClientStorage, OnDiskP
|
||||
use nym_client_core::{config::GatewayEndpointConfig, error::ClientCoreStatusMessage};
|
||||
use nym_socks5_client_core::NymClient as Socks5NymClient;
|
||||
use nym_socks5_client_core::Socks5ControlMessageSender;
|
||||
use nym_sphinx::params::PacketSize;
|
||||
use nym_task::manager::TaskStatus;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tap::TapFallible;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
@@ -41,7 +39,7 @@ pub async fn start_nym_socks5_client(
|
||||
GatewayEndpointConfig,
|
||||
)> {
|
||||
log::info!("Loading config from file: {id}");
|
||||
let mut config = Config::read_from_default_path(id)
|
||||
let config = Config::read_from_default_path(id)
|
||||
.tap_err(|_| log::warn!("Failed to load configuration file"))?;
|
||||
|
||||
let storage =
|
||||
@@ -55,21 +53,6 @@ pub async fn start_nym_socks5_client(
|
||||
.expect("failed to load gateway details")
|
||||
.into();
|
||||
|
||||
// Disable both the loop cover traffic that runs in the background as well as the Poisson
|
||||
// process that injects cover traffic into the traffic stream.
|
||||
if std::env::var("NYM_CONNECT_DISABLE_COVER").is_ok() {
|
||||
config.core.base.set_no_cover_traffic();
|
||||
}
|
||||
|
||||
if std::env::var("NYM_CONNECT_ENABLE_MIXED_SIZE_PACKETS").is_ok() {
|
||||
config.core.base.debug.traffic.secondary_packet_size = Some(PacketSize::ExtendedPacket16);
|
||||
}
|
||||
|
||||
if std::env::var("NYM_CONNECT_DISABLE_PER_HOP_DELAY").is_ok() {
|
||||
config.core.base.debug.traffic.average_packet_delay = Duration::ZERO;
|
||||
config.core.base.debug.acknowledgements.average_ack_delay = Duration::ZERO;
|
||||
}
|
||||
|
||||
log::info!("Starting socks5 client");
|
||||
|
||||
// Channel to send control messages to the socks5 client
|
||||
|
||||
@@ -2,7 +2,6 @@ plugins {
|
||||
id 'com.android.application'
|
||||
id 'org.jetbrains.kotlin.android'
|
||||
id 'org.jetbrains.kotlin.plugin.serialization' version '1.8.21'
|
||||
id "io.sentry.android.gradle" version "3.11.0"
|
||||
}
|
||||
|
||||
android {
|
||||
@@ -102,20 +101,18 @@ dependencies {
|
||||
|
||||
implementation 'androidx.core:core-ktx:1.10.1'
|
||||
implementation 'androidx.lifecycle:lifecycle-runtime-ktx:2.6.1'
|
||||
implementation 'androidx.activity:activity-compose:1.7.2'
|
||||
implementation 'androidx.activity:activity-compose:1.5.1'
|
||||
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.4'
|
||||
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.6.4'
|
||||
implementation 'org.jetbrains.kotlinx:kotlinx-serialization-json:1.5.0'
|
||||
implementation platform('androidx.compose:compose-bom:2022.10.00')
|
||||
implementation 'androidx.lifecycle:lifecycle-viewmodel-compose:2.6.1'
|
||||
implementation 'androidx.navigation:navigation-compose:2.6.0'
|
||||
implementation 'androidx.compose.runtime:runtime-livedata'
|
||||
implementation 'androidx.compose.ui:ui'
|
||||
implementation 'androidx.compose.ui:ui-graphics'
|
||||
implementation 'androidx.compose.ui:ui-tooling-preview'
|
||||
implementation 'androidx.compose.material3:material3'
|
||||
implementation 'androidx.work:work-runtime-ktx:2.8.1'
|
||||
implementation 'androidx.datastore:datastore-preferences:1.0.0'
|
||||
testImplementation 'junit:junit:4.13.2'
|
||||
androidTestImplementation 'androidx.test.ext:junit:1.1.3'
|
||||
androidTestImplementation 'androidx.test.espresso:espresso-core:3.4.0'
|
||||
|
||||
@@ -17,7 +17,6 @@
|
||||
android:roundIcon="@mipmap/ic_launcher_round"
|
||||
android:supportsRtl="true"
|
||||
android:theme="@style/Theme.Nyms5"
|
||||
android:enableOnBackInvokedCallback="true"
|
||||
tools:targetApi="31">
|
||||
<activity
|
||||
android:name=".MainActivity"
|
||||
@@ -35,20 +34,6 @@
|
||||
android:name="androidx.startup.InitializationProvider"
|
||||
android:authorities="${applicationId}.androidx-startup"
|
||||
tools:node="remove" />
|
||||
|
||||
<!-- Sentry -->
|
||||
<meta-data
|
||||
android:name="io.sentry.auto-init"
|
||||
android:value="false" />
|
||||
<!-- enable screenshot for crashes -->
|
||||
<meta-data
|
||||
android:name="io.sentry.attach-screenshot"
|
||||
android:value="true" />
|
||||
<!-- enable view hierarchy for crashes -->
|
||||
<meta-data
|
||||
android:name="io.sentry.attach-view-hierarchy"
|
||||
android:value="true" />
|
||||
|
||||
</application>
|
||||
|
||||
</manifest>
|
||||
|
||||
@@ -1,21 +1,9 @@
|
||||
package net.nymtech.nyms5
|
||||
|
||||
import android.app.Application
|
||||
import android.content.Context
|
||||
import android.util.Log
|
||||
import androidx.datastore.core.DataStore
|
||||
import androidx.datastore.preferences.core.Preferences
|
||||
import androidx.datastore.preferences.core.booleanPreferencesKey
|
||||
import androidx.datastore.preferences.preferencesDataStore
|
||||
import androidx.work.Configuration
|
||||
import androidx.work.DelegatingWorkerFactory
|
||||
import io.sentry.android.core.SentryAndroid
|
||||
import kotlinx.coroutines.flow.first
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.runBlocking
|
||||
|
||||
val Context.dataStore: DataStore<Preferences> by preferencesDataStore(name = "settings")
|
||||
val monitoringKey = booleanPreferencesKey("monitoring")
|
||||
|
||||
class App : Application(), Configuration.Provider {
|
||||
companion object {
|
||||
@@ -25,30 +13,6 @@ class App : Application(), Configuration.Provider {
|
||||
|
||||
private val tag = "App"
|
||||
|
||||
override fun onCreate() {
|
||||
super.onCreate()
|
||||
val app = this
|
||||
|
||||
runBlocking {
|
||||
val monitoring = applicationContext.dataStore.data.map { preferences ->
|
||||
preferences[monitoringKey] ?: false
|
||||
}.first()
|
||||
|
||||
if (monitoring) {
|
||||
Log.i(tag, "Performance monitoring and error reporting enabled")
|
||||
SentryAndroid.init(app) { options ->
|
||||
options.dsn =
|
||||
"https://6872f5818bc147ef9c0fce114fcaac8a@o967446.ingest.sentry.io/4505306218102784"
|
||||
options.enableAllAutoBreadcrumbs(true)
|
||||
|
||||
// TODO should be adjusted in production env
|
||||
options.tracesSampleRate = 1.0
|
||||
options.profilesSampleRate = 1.0
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun getWorkManagerConfiguration(): Configuration {
|
||||
val workerFactory = DelegatingWorkerFactory()
|
||||
// pass in the NymProxy class instance
|
||||
|
||||
@@ -14,21 +14,11 @@ import androidx.compose.foundation.layout.fillMaxWidth
|
||||
import androidx.compose.foundation.layout.height
|
||||
import androidx.compose.foundation.layout.padding
|
||||
import androidx.compose.foundation.layout.width
|
||||
import androidx.compose.foundation.rememberScrollState
|
||||
import androidx.compose.foundation.shape.RoundedCornerShape
|
||||
import androidx.compose.foundation.verticalScroll
|
||||
import androidx.compose.material.icons.Icons
|
||||
import androidx.compose.material.icons.filled.ArrowBack
|
||||
import androidx.compose.material.icons.filled.Menu
|
||||
import androidx.compose.material3.CenterAlignedTopAppBar
|
||||
import androidx.compose.material3.DropdownMenu
|
||||
import androidx.compose.material3.DropdownMenuItem
|
||||
import androidx.compose.material3.ExperimentalMaterial3Api
|
||||
import androidx.compose.material3.Icon
|
||||
import androidx.compose.material3.IconButton
|
||||
import androidx.compose.material3.LinearProgressIndicator
|
||||
import androidx.compose.material3.MaterialTheme
|
||||
import androidx.compose.material3.Scaffold
|
||||
import androidx.compose.material3.Surface
|
||||
import androidx.compose.material3.Switch
|
||||
import androidx.compose.material3.Text
|
||||
@@ -44,8 +34,6 @@ import androidx.lifecycle.repeatOnLifecycle
|
||||
import kotlinx.coroutines.launch
|
||||
import net.nymtech.nyms5.ui.theme.NymTheme
|
||||
import androidx.compose.runtime.getValue
|
||||
import androidx.compose.runtime.remember
|
||||
import androidx.compose.runtime.rememberCoroutineScope
|
||||
import androidx.compose.runtime.saveable.rememberSaveable
|
||||
import androidx.compose.ui.Alignment
|
||||
import androidx.compose.ui.graphics.Color
|
||||
@@ -54,17 +42,8 @@ import androidx.compose.ui.res.painterResource
|
||||
import androidx.compose.ui.res.stringResource
|
||||
import androidx.compose.ui.text.AnnotatedString
|
||||
import androidx.compose.ui.text.font.FontStyle
|
||||
import androidx.datastore.core.DataStore
|
||||
import androidx.datastore.preferences.core.Preferences
|
||||
import androidx.datastore.preferences.core.edit
|
||||
import androidx.navigation.compose.NavHost
|
||||
import androidx.navigation.compose.composable
|
||||
import androidx.navigation.compose.currentBackStackEntryAsState
|
||||
import androidx.navigation.compose.rememberNavController
|
||||
import androidx.work.WorkInfo
|
||||
import androidx.work.WorkManager
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.flow.map
|
||||
|
||||
class MainActivity : ComponentActivity() {
|
||||
private val tag = "MainActivity"
|
||||
@@ -102,16 +81,18 @@ class MainActivity : ComponentActivity() {
|
||||
|
||||
lifecycleScope.launch {
|
||||
repeatOnLifecycle(Lifecycle.State.STARTED) {
|
||||
Log.d(tag, "____UI recompose")
|
||||
applicationContext.dataStore.data.map { preferences ->
|
||||
preferences[monitoringKey] ?: false
|
||||
}.collect { monitoring ->
|
||||
viewModel.uiState.collect {
|
||||
setContent {
|
||||
NymTheme {
|
||||
Log.d(tag, "____uiState collect")
|
||||
viewModel.uiState.collect {
|
||||
setContent {
|
||||
NymTheme {
|
||||
// A surface container using the 'background' color from the theme
|
||||
Surface(
|
||||
modifier = Modifier.fillMaxSize(),
|
||||
color = MaterialTheme.colorScheme.background
|
||||
) {
|
||||
val loading = it.loading
|
||||
|
||||
HomeScreen(it, monitoring, applicationContext.dataStore) {
|
||||
S5ClientSwitch(it.connected, loading, {
|
||||
if (!loading) {
|
||||
when {
|
||||
it -> {
|
||||
@@ -125,7 +106,7 @@ class MainActivity : ComponentActivity() {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -133,90 +114,6 @@ class MainActivity : ComponentActivity() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun onStart() {
|
||||
super.onStart()
|
||||
viewModel.checkStateSync()
|
||||
}
|
||||
}
|
||||
|
||||
@OptIn(ExperimentalMaterial3Api::class)
|
||||
@Composable
|
||||
fun HomeScreen(
|
||||
proxyState: MainViewModel.ProxyState,
|
||||
monitoring: Boolean,
|
||||
dataStore: DataStore<Preferences>,
|
||||
onSwitch: (value: Boolean) -> Unit,
|
||||
) {
|
||||
val navController = rememberNavController()
|
||||
var expanded by remember { mutableStateOf(false) }
|
||||
val scope = rememberCoroutineScope()
|
||||
|
||||
Scaffold(topBar = {
|
||||
CenterAlignedTopAppBar(
|
||||
title = {
|
||||
Text(stringResource(R.string.app_name))
|
||||
},
|
||||
navigationIcon = {
|
||||
val navBackStackEntry by navController.currentBackStackEntryAsState()
|
||||
val currentRoute = navBackStackEntry?.destination?.route
|
||||
|
||||
if (currentRoute === "proxy") {
|
||||
IconButton(onClick = { expanded = true }) {
|
||||
Icon(
|
||||
imageVector = Icons.Filled.Menu,
|
||||
contentDescription = "Main menu"
|
||||
)
|
||||
}
|
||||
DropdownMenu(
|
||||
expanded = expanded,
|
||||
onDismissRequest = { expanded = false }
|
||||
) {
|
||||
DropdownMenuItem(onClick = {
|
||||
navController.navigate("monitoring") {
|
||||
popUpTo("proxy")
|
||||
}
|
||||
expanded = false
|
||||
}, text = {
|
||||
Text("Error reporting")
|
||||
})
|
||||
}
|
||||
} else {
|
||||
IconButton(onClick = {
|
||||
navController.navigate("proxy") {
|
||||
popUpTo("proxy")
|
||||
}
|
||||
}) {
|
||||
Icon(
|
||||
imageVector = Icons.Filled.ArrowBack,
|
||||
contentDescription = "Back home"
|
||||
)
|
||||
}
|
||||
}
|
||||
},
|
||||
)
|
||||
}) { contentPadding ->
|
||||
NavHost(
|
||||
navController = navController,
|
||||
startDestination = "proxy",
|
||||
modifier = Modifier.padding(contentPadding)
|
||||
) {
|
||||
composable("proxy") {
|
||||
S5ClientSwitch(
|
||||
connected = proxyState.connected,
|
||||
loading = proxyState.loading,
|
||||
onSwitch = onSwitch
|
||||
)
|
||||
}
|
||||
composable("monitoring") {
|
||||
Monitoring(initialValue = monitoring) {
|
||||
scope.launch(Dispatchers.IO) {
|
||||
dataStore.edit { settings -> settings[monitoringKey] = it }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@OptIn(ExperimentalMaterial3Api::class)
|
||||
@@ -301,46 +198,6 @@ fun S5ClientSwitch(
|
||||
}
|
||||
}
|
||||
|
||||
@Composable
|
||||
fun Monitoring(
|
||||
modifier: Modifier = Modifier,
|
||||
initialValue: Boolean,
|
||||
onSwitch: (value: Boolean) -> Unit,
|
||||
) {
|
||||
var monitoring by remember { mutableStateOf(initialValue) }
|
||||
|
||||
Column(
|
||||
modifier = modifier
|
||||
.padding(16.dp)
|
||||
.verticalScroll(rememberScrollState())
|
||||
) {
|
||||
Row(
|
||||
verticalAlignment = Alignment.CenterVertically
|
||||
) {
|
||||
Text("Enable error reporting")
|
||||
Spacer(modifier = modifier.width(16.dp))
|
||||
Switch(checked = monitoring, onCheckedChange = {
|
||||
monitoring = it
|
||||
onSwitch(it)
|
||||
})
|
||||
}
|
||||
Spacer(modifier = modifier.height(18.dp))
|
||||
Row(verticalAlignment = Alignment.CenterVertically) {
|
||||
Icon(
|
||||
painter = painterResource(R.drawable.warning_24),
|
||||
contentDescription = "copy to clipboard",
|
||||
tint = Color.Yellow
|
||||
)
|
||||
Spacer(modifier = modifier.width(16.dp))
|
||||
Text(stringResource(R.string.monitoring_desc_3), color = Color.Yellow)
|
||||
}
|
||||
Spacer(modifier = modifier.height(18.dp))
|
||||
Text(stringResource(R.string.monitoring_desc_1))
|
||||
Spacer(modifier = modifier.height(18.dp))
|
||||
Text(stringResource(R.string.monitoring_desc_2))
|
||||
}
|
||||
}
|
||||
|
||||
@Preview
|
||||
@Composable
|
||||
fun PreviewSocks5Client() {
|
||||
@@ -363,18 +220,3 @@ fun PreviewSocks5Client() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Preview
|
||||
@Composable
|
||||
fun PreviewMonitoring() {
|
||||
NymTheme {
|
||||
Surface(
|
||||
modifier = Modifier.fillMaxSize(),
|
||||
color = MaterialTheme.colorScheme.background
|
||||
) {
|
||||
Monitoring(initialValue = false) {
|
||||
Log.d("Monitoring", "switch $it")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ class MainViewModel(
|
||||
private val workManager: WorkManager,
|
||||
private val nymProxy: NymProxy
|
||||
) : ViewModel() {
|
||||
private val tag = "MainViewModel"
|
||||
private val tag = "viewModel"
|
||||
|
||||
private val workRequest: OneTimeWorkRequest =
|
||||
OneTimeWorkRequestBuilder<ProxyWorker>()
|
||||
@@ -56,7 +56,8 @@ class MainViewModel(
|
||||
// this viewModel instance is cleared
|
||||
// use GlobalScope instead
|
||||
GlobalScope.launch(Dispatchers.IO) {
|
||||
// if the proxy process is still running ie. connected kill it
|
||||
// if the proxy process is still running ie. connected
|
||||
// kill it
|
||||
if (nymProxy.getState() == NymProxy.Companion.State.CONNECTED) {
|
||||
Log.d(tag, "stopping proxy")
|
||||
nymProxy.stop()
|
||||
@@ -130,29 +131,6 @@ class MainViewModel(
|
||||
setDisconnected()
|
||||
}
|
||||
}
|
||||
|
||||
fun checkStateSync() {
|
||||
Log.d(tag, "check state sync")
|
||||
viewModelScope.launch(Dispatchers.IO) {
|
||||
val proxyState = nymProxy.getState()
|
||||
val workInfo = workManager.getWorkInfoById(ProxyWorker.workId).get()
|
||||
Log.d(tag, "proxy state $proxyState, work state ${workInfo?.state}")
|
||||
if (proxyState == NymProxy.Companion.State.CONNECTED &&
|
||||
workInfo?.state != WorkInfo.State.RUNNING
|
||||
) {
|
||||
Log.w(tag, "⚠ state desync")
|
||||
Log.i(tag, "stopping proxy")
|
||||
cancelProxyWork()
|
||||
}
|
||||
if (proxyState == NymProxy.Companion.State.DISCONNECTED &&
|
||||
workInfo?.state == WorkInfo.State.RUNNING
|
||||
) {
|
||||
Log.w(tag, "⚠ state desync")
|
||||
Log.i(tag, "stopping worker")
|
||||
workManager.cancelAllWorkByTag(ProxyWorker.workTag)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class MainViewModelFactory(
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
<vector android:height="24dp" android:tint="#000000"
|
||||
android:viewportHeight="24" android:viewportWidth="24"
|
||||
android:width="24dp" xmlns:android="http://schemas.android.com/apk/res/android">
|
||||
<path android:fillColor="@android:color/white" android:pathData="M1,21h22L12,2 1,21zM13,18h-2v-2h2v2zM13,14h-2v-4h2v4z"/>
|
||||
</vector>
|
||||
@@ -11,8 +11,4 @@
|
||||
<string name="sp_url">https://harbourmaster.nymtech.net/v1/services?size=100</string>
|
||||
<string name="default_sp">DpB3cHAchJiNBQi5FrZx2csXb1mrHkpYh9Wzf8Rjsuko.ANNWrvHqMYuertHGHUrZdBntQhpzfbWekB39qez9U2Vx@2BuMSfMW3zpeAjKXyKLhmY4QW1DXurrtSPEJ6CjX3SEh</string>
|
||||
<string name="connected_text">Connected to the mixnet</string>
|
||||
<string name="monitoring_desc_1">Help Nym developers to fix errors, crashes and improve the application by enabling this option. If errors occur or if the app crashes, it will automatically send a report. Also it tracks various performance metrics. We use sentry.io service to handle this.</string>
|
||||
<string name="monitoring_desc_2">Note: A report can include your external IP, this can be useful to catch issues related to IP location.
|
||||
All recorded data is used by Nym developers and for app development purposes only.</string>
|
||||
<string name="monitoring_desc_3">You must restart the application for the change to take effect.</string>
|
||||
</resources>
|
||||
@@ -1,3 +0,0 @@
|
||||
defaults.project=nym-connect-android
|
||||
defaults.org=nymtech
|
||||
# auth.token=xxx
|
||||
@@ -9,11 +9,9 @@ pub const DEFAULT_HOPS: usize = 4;
|
||||
pub const ROUTING_INFORMATION_LENGTH_BY_STAGE: [u8; DEFAULT_HOPS] =
|
||||
[DEFAULT_ROUTING_INFO_SIZE; DEFAULT_HOPS];
|
||||
pub const MIN_PACKET_SIZE: usize = 48;
|
||||
pub const MAGIC_SLICE: &[u8] = &[111, 102, 120];
|
||||
|
||||
pub const OUTFOX_PACKET_OVERHEAD: usize = MIX_PARAMS_LEN
|
||||
+ (groupelementbytes() + tagbytes() + DEFAULT_ROUTING_INFO_SIZE as usize) * DEFAULT_HOPS
|
||||
+ MAGIC_SLICE.len();
|
||||
+ (groupelementbytes() + tagbytes() + DEFAULT_ROUTING_INFO_SIZE as usize) * DEFAULT_HOPS;
|
||||
|
||||
pub const fn groupelementbytes() -> usize {
|
||||
GROUPELEMENTBYTES as usize
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use crate::constants::MAGIC_SLICE;
|
||||
use crate::constants::MIN_MESSAGE_LEN;
|
||||
use crate::constants::MIX_PARAMS_LEN;
|
||||
use chacha20::cipher::InvalidLength;
|
||||
@@ -26,6 +25,4 @@ pub enum OutfoxError {
|
||||
},
|
||||
#[error("Header length must be {MIX_PARAMS_LEN}, got {0}")]
|
||||
InvalidHeaderLength(usize),
|
||||
#[error("Invalid magic bytes, expected: {:?}, got: {:?}", MAGIC_SLICE, 0)]
|
||||
InvalidMagicBytes(Vec<u8>),
|
||||
}
|
||||
|
||||
+11
-15
@@ -7,7 +7,7 @@ use std::{
|
||||
};
|
||||
|
||||
use crate::{
|
||||
constants::{DEFAULT_HOPS, MAGIC_SLICE, MIN_PACKET_SIZE, MIX_PARAMS_LEN},
|
||||
constants::{DEFAULT_HOPS, MIN_PACKET_SIZE, MIX_PARAMS_LEN},
|
||||
error::OutfoxError,
|
||||
format::{MixCreationParameters, MixStageParameters},
|
||||
};
|
||||
@@ -60,19 +60,16 @@ impl TryFrom<&[u8]> for OutfoxPacket {
|
||||
}
|
||||
|
||||
impl OutfoxPacket {
|
||||
pub fn recover_plaintext(&self) -> Result<Vec<u8>, OutfoxError> {
|
||||
pub fn recover_plaintext(&self) -> Vec<u8> {
|
||||
let plaintext = self.payload()[self.payload_range()].to_vec();
|
||||
let mut plaintext = VecDeque::from_iter(plaintext);
|
||||
while let Some(0) = plaintext.front() {
|
||||
plaintext.pop_front();
|
||||
}
|
||||
let mut plaintext = plaintext.make_contiguous().to_vec();
|
||||
let payload = plaintext.split_off(MAGIC_SLICE.len());
|
||||
if plaintext != MAGIC_SLICE {
|
||||
Err(OutfoxError::InvalidMagicBytes(plaintext))
|
||||
} else {
|
||||
Ok(payload)
|
||||
if plaintext.starts_with(&[0]) {
|
||||
let mut plaintext = VecDeque::from_iter(plaintext);
|
||||
while let Some(0) = plaintext.front() {
|
||||
plaintext.pop_front();
|
||||
}
|
||||
return plaintext.make_contiguous().to_vec();
|
||||
}
|
||||
plaintext
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
@@ -103,12 +100,11 @@ impl OutfoxPacket {
|
||||
MIN_PACKET_SIZE
|
||||
} else {
|
||||
packet_size
|
||||
} + MAGIC_SLICE.len();
|
||||
};
|
||||
let mix_params = MixCreationParameters::new(packet_size as u16);
|
||||
|
||||
let padding = mix_params.total_packet_length() - payload.as_ref().len() - MAGIC_SLICE.len();
|
||||
let padding = mix_params.total_packet_length() - payload.as_ref().len();
|
||||
let mut buffer = vec![0; padding];
|
||||
buffer.extend_from_slice(MAGIC_SLICE);
|
||||
buffer.extend_from_slice(payload.as_ref());
|
||||
|
||||
// Last node in the route is a gateway, it will decrypt last, and get the final destination address
|
||||
|
||||
@@ -88,7 +88,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_packet_params_short() {
|
||||
fn test_packet_params() {
|
||||
let (node1_pk, node1_pub) = sphinx_packet::crypto::keygen();
|
||||
let node1 = Node::new(
|
||||
NodeAddressBytes::from_bytes([0u8; NODE_ADDRESS_LENGTH]),
|
||||
@@ -118,7 +118,7 @@ mod tests {
|
||||
|
||||
let route = [node1, node2.clone(), node3.clone(), gateway.clone()];
|
||||
|
||||
let payload = vec![0, 0, 1, 1, 1, 0, 0];
|
||||
let payload = randombytes(21);
|
||||
|
||||
let packet =
|
||||
OutfoxPacket::build(&payload, &route, &destination, Some(payload.len())).unwrap();
|
||||
@@ -140,62 +140,6 @@ mod tests {
|
||||
let destination_address = packet.decode_next_layer(&gateway_pk).unwrap();
|
||||
assert_eq!(destination_address, destination.address.as_bytes());
|
||||
|
||||
assert_eq!(payload, packet.recover_plaintext().unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_packet_params_long() {
|
||||
let (node1_pk, node1_pub) = sphinx_packet::crypto::keygen();
|
||||
let node1 = Node::new(
|
||||
NodeAddressBytes::from_bytes([0u8; NODE_ADDRESS_LENGTH]),
|
||||
node1_pub,
|
||||
);
|
||||
let (node2_pk, node2_pub) = sphinx_packet::crypto::keygen();
|
||||
let node2 = Node::new(
|
||||
NodeAddressBytes::from_bytes([1u8; NODE_ADDRESS_LENGTH]),
|
||||
node2_pub,
|
||||
);
|
||||
let (node3_pk, node3_pub) = sphinx_packet::crypto::keygen();
|
||||
let node3 = Node::new(
|
||||
NodeAddressBytes::from_bytes([2u8; NODE_ADDRESS_LENGTH]),
|
||||
node3_pub,
|
||||
);
|
||||
|
||||
let (gateway_pk, gateway_pub) = sphinx_packet::crypto::keygen();
|
||||
let gateway = Node::new(
|
||||
NodeAddressBytes::from_bytes([3u8; NODE_ADDRESS_LENGTH]),
|
||||
gateway_pub,
|
||||
);
|
||||
|
||||
let destination = Destination::new(
|
||||
DestinationAddressBytes::from_bytes([9u8; NODE_ADDRESS_LENGTH]),
|
||||
[0u8; 16],
|
||||
);
|
||||
|
||||
let route = [node1, node2.clone(), node3.clone(), gateway.clone()];
|
||||
|
||||
let payload = randombytes(2048);
|
||||
|
||||
let packet =
|
||||
OutfoxPacket::build(&payload, &route, &destination, Some(payload.len())).unwrap();
|
||||
let packet_bytes = packet.to_bytes().unwrap();
|
||||
println!(
|
||||
"packet bytes length, {}, declared {}",
|
||||
packet_bytes.len(),
|
||||
packet.len()
|
||||
);
|
||||
|
||||
let mut packet = OutfoxPacket::try_from(packet_bytes.as_slice()).unwrap();
|
||||
|
||||
let next_address = packet.decode_next_layer(&node1_pk).unwrap();
|
||||
assert_eq!(next_address, node2.address.as_bytes());
|
||||
let next_address = packet.decode_next_layer(&node2_pk).unwrap();
|
||||
assert_eq!(next_address, node3.address.as_bytes());
|
||||
let next_address = packet.decode_next_layer(&node3_pk).unwrap();
|
||||
assert_eq!(next_address, gateway.address.as_bytes());
|
||||
let destination_address = packet.decode_next_layer(&gateway_pk).unwrap();
|
||||
assert_eq!(destination_address, destination.address.as_bytes());
|
||||
|
||||
assert_eq!(payload, packet.recover_plaintext().unwrap());
|
||||
assert_eq!(payload, packet.recover_plaintext());
|
||||
}
|
||||
}
|
||||
|
||||
Generated
+2
-13
@@ -2915,7 +2915,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-bin-common"
|
||||
version = "0.6.0"
|
||||
version = "0.5.0"
|
||||
dependencies = [
|
||||
"atty",
|
||||
"clap",
|
||||
@@ -3220,7 +3220,6 @@ dependencies = [
|
||||
"nym-service-provider-directory-common",
|
||||
"nym-vesting-contract",
|
||||
"nym-vesting-contract-common",
|
||||
"openssl",
|
||||
"prost",
|
||||
"reqwest",
|
||||
"serde",
|
||||
@@ -3298,7 +3297,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym_wallet"
|
||||
version = "1.2.5"
|
||||
version = "1.2.4"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"base64 0.13.1",
|
||||
@@ -3448,15 +3447,6 @@ version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
|
||||
|
||||
[[package]]
|
||||
name = "openssl-src"
|
||||
version = "111.26.0+1.1.1u"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "efc62c9f12b22b8f5208c23a7200a442b2e5999f8bdf80233852122b5a4f6f37"
|
||||
dependencies = [
|
||||
"cc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "openssl-sys"
|
||||
version = "0.9.80"
|
||||
@@ -3466,7 +3456,6 @@ dependencies = [
|
||||
"autocfg",
|
||||
"cc",
|
||||
"libc",
|
||||
"openssl-src",
|
||||
"pkg-config",
|
||||
"vcpkg",
|
||||
]
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
{
|
||||
"name": "@nymproject/nym-client-wasm",
|
||||
"version": "1.0.0",
|
||||
"sideEffects": false
|
||||
}
|
||||
@@ -24,7 +24,6 @@
|
||||
"prebuild": "yarn build:dependencies",
|
||||
"build": "yarn build:only-this",
|
||||
"build:only-this": "scripts/build-prod.sh",
|
||||
"prebuild:dev": "yarn build:dependencies",
|
||||
"build:dev": "yarn build:dev:only-this",
|
||||
"build:dev:only-this": "scripts/build.sh"
|
||||
},
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
use nym_cli_commands::context::ClientArgs;
|
||||
use nym_network_defaults::NymNetworkDetails;
|
||||
|
||||
pub(crate) async fn execute(
|
||||
_global_args: ClientArgs,
|
||||
identity_key: nym_cli_commands::validator::mixnet::operators::identity_key::MixnetOperatorsIdentityKey,
|
||||
_network_details: &NymNetworkDetails,
|
||||
) -> anyhow::Result<()> {
|
||||
let res = match identity_key.command {
|
||||
nym_cli_commands::validator::mixnet::operators::identity_key::MixnetOperatorsIdentityKeyCommands::Sign(sign_args) => {
|
||||
nym_cli_commands::validator::mixnet::operators::identity_key::sign(sign_args).await
|
||||
}
|
||||
};
|
||||
Ok(res?)
|
||||
}
|
||||
@@ -5,7 +5,6 @@ use nym_cli_commands::context::ClientArgs;
|
||||
use nym_network_defaults::NymNetworkDetails;
|
||||
|
||||
pub(crate) mod gateways;
|
||||
pub(crate) mod identity_key;
|
||||
pub(crate) mod mixnodes;
|
||||
pub(crate) mod name;
|
||||
pub(crate) mod services;
|
||||
@@ -24,6 +23,5 @@ pub(crate) async fn execute(
|
||||
) => mixnodes::execute(global_args, mixnode, network_details).await,
|
||||
nym_cli_commands::validator::mixnet::operators::MixnetOperatorsCommands::ServiceProvider(service) => services::execute(global_args, service, network_details).await,
|
||||
nym_cli_commands::validator::mixnet::operators::MixnetOperatorsCommands::Name(name) => name::execute(global_args, name, network_details).await,
|
||||
nym_cli_commands::validator::mixnet::operators::MixnetOperatorsCommands::IdentityKey(identity_key) => identity_key::execute(global_args, identity_key, network_details).await,
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user