Compare commits

...

17 Commits

Author SHA1 Message Date
Jon Häggblad a9428d4f7b wip 2023-02-06 17:32:40 +01:00
Jon Häggblad 067ee0db74 wip 2023-02-06 17:18:33 +01:00
Jon Häggblad e85974d9a5 wip 2023-02-06 15:01:34 +01:00
Jon Häggblad 000100852b wip 2023-02-06 10:58:27 +01:00
Jon Häggblad f66110eaa9 minimal poc 2023-02-05 17:10:04 +01:00
Mark Sinclair 78aa07360c Update build-and-upload-binaries-ci.yml 2023-02-03 17:10:13 +00:00
Mark Sinclair 4e52478c7a Update build-and-upload-binaries-ci.yml 2023-02-03 16:54:49 +00:00
Mark Sinclair 7c77665a37 Update build-and-upload-binaries-ci.yml 2023-02-03 16:36:44 +00:00
Mark Sinclair 72880b9764 Update build-and-upload-binaries-ci.yml 2023-02-03 16:14:29 +00:00
Mark Sinclair fd1c0ae62b Update build-and-upload-binaries-ci.yml 2023-02-03 12:03:13 +00:00
Mark Sinclair bd7399091b Update build-and-upload-binaries-ci.yml 2023-02-03 11:57:32 +00:00
Mark Sinclair 00fad44e2e Update build-and-upload-binaries-ci.yml 2023-02-03 11:46:07 +00:00
Mark Sinclair ea33c332ee GitHub Actions: add action to build and upload binaries to CI server 2023-02-03 11:35:03 +00:00
Bogdan-Ștefan Neacșu b39f8af8d0 Fix flaky dkg test 2023-02-03 12:29:21 +02:00
dependabot[bot] a400463b7e build(deps): bump ua-parser-js in /nym-wallet/webdriver (#2909)
Bumps [ua-parser-js](https://github.com/faisalman/ua-parser-js) from 0.7.28 to 0.7.33.
- [Release notes](https://github.com/faisalman/ua-parser-js/releases)
- [Changelog](https://github.com/faisalman/ua-parser-js/blob/master/changelog.md)
- [Commits](https://github.com/faisalman/ua-parser-js/compare/0.7.28...0.7.33)

---
updated-dependencies:
- dependency-name: ua-parser-js
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-02-03 10:00:09 +00:00
dependabot[bot] 82928af64c build(deps): bump http-cache-semantics in /nym-wallet/webdriver (#2960)
Bumps [http-cache-semantics](https://github.com/kornelski/http-cache-semantics) from 4.1.0 to 4.1.1.
- [Release notes](https://github.com/kornelski/http-cache-semantics/releases)
- [Commits](https://github.com/kornelski/http-cache-semantics/commits)

---
updated-dependencies:
- dependency-name: http-cache-semantics
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-02-03 09:59:08 +00:00
dependabot[bot] c5891f546c build(deps): bump http-cache-semantics from 4.1.0 to 4.1.1 (#2961)
Bumps [http-cache-semantics](https://github.com/kornelski/http-cache-semantics) from 4.1.0 to 4.1.1.
- [Release notes](https://github.com/kornelski/http-cache-semantics/releases)
- [Commits](https://github.com/kornelski/http-cache-semantics/commits)

---
updated-dependencies:
- dependency-name: http-cache-semantics
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-02-03 09:58:34 +00:00
18 changed files with 229 additions and 280 deletions
@@ -0,0 +1,113 @@
name: Build and upload binaries to CI
on:
workflow_dispatch:
push:
paths:
- 'clients/**'
- 'common/**'
- 'contracts/**'
- 'explorer-api/**'
- 'gateway/**'
- 'integrations/**'
- 'mixnode/**'
- 'sdk/rust/nym-sdk/**'
- 'service-providers/**'
- 'nym-api/**'
- 'nym-outfox/**'
- 'tools/nym-cli/**'
- 'tools/ts-rs-cli/**'
pull_request:
paths:
- 'clients/**'
- 'common/**'
- 'contracts/**'
- 'explorer-api/**'
- 'gateway/**'
- 'integrations/**'
- 'mixnode/**'
- 'sdk/rust/nym-sdk/**'
- 'service-providers/**'
- 'nym-api/**'
- 'nym-outfox/**'
- 'tools/nym-cli/**'
- 'tools/ts-rs-cli/**'
env:
NETWORK: mainnet
jobs:
publish-nym:
strategy:
fail-fast: false
matrix:
platform: [ubuntu-20.04]
runs-on: ${{ matrix.platform }}
steps:
- uses: actions/checkout@v3
- name: Prepare build output directory
shell: bash
env:
OUTPUT_DIR: ci-builds/${{ github.ref_name }}
run: |
rm -rf ci-builds || true
mkdir -p $OUTPUT_DIR
echo $OUTPUT_DIR
- name: Install Dependencies (Linux)
run: sudo apt-get update && sudo apt-get -y install libwebkit2gtk-4.0-dev build-essential curl wget libssl-dev libgtk-3-dev libudev-dev squashfs-tools
continue-on-error: true
- name: Install Rust stable
uses: actions-rs/toolchain@v1
with:
toolchain: stable
- name: Build all binaries
uses: actions-rs/cargo@v1
with:
command: build
args: --workspace --release
- name: Install Rust stable
uses: actions-rs/toolchain@v1
with:
toolchain: stable
target: wasm32-unknown-unknown
override: true
components: rustfmt, clippy
- name: Build release contracts
run: make wasm
- name: Prepare build output
shell: bash
env:
OUTPUT_DIR: ci-builds/${{ github.ref_name }}
run: |
cp target/release/nym-client $OUTPUT_DIR
cp target/release/nym-gateway $OUTPUT_DIR
cp target/release/nym-mixnode $OUTPUT_DIR
cp target/release/nym-socks5-client $OUTPUT_DIR
cp target/release/nym-api $OUTPUT_DIR
cp target/release/nym-network-requester $OUTPUT_DIR
cp target/release/nym-network-statistics $OUTPUT_DIR
cp target/release/nym-cli $OUTPUT_DIR
cp contracts/target/wasm32-unknown-unknown/release/mixnet_contract.wasm $OUTPUT_DIR
cp contracts/target/wasm32-unknown-unknown/release/vesting_contract.wasm $OUTPUT_DIR
- name: Deploy branch to CI www
continue-on-error: true
uses: easingthemes/ssh-deploy@main
env:
SSH_PRIVATE_KEY: ${{ secrets.CI_WWW_SSH_PRIVATE_KEY }}
ARGS: "-avzr"
SOURCE: "ci-builds/"
REMOTE_HOST: ${{ secrets.CI_WWW_REMOTE_HOST }}
REMOTE_USER: ${{ secrets.CI_WWW_REMOTE_USER }}
TARGET: ${{ secrets.CI_WWW_REMOTE_TARGET }}/builds/
EXCLUDE: "/dist/, /node_modules/"
Generated
+1
View File
@@ -3743,6 +3743,7 @@ dependencies = [
"log",
"logging",
"network-defaults",
"nym-sdk",
"nymsphinx",
"ordered-buffer",
"pretty_env_logger",
@@ -51,11 +51,22 @@ pub mod non_wasm_helpers;
pub mod helpers;
#[derive(Clone)]
pub struct ClientInput {
pub connection_command_sender: ConnectionCommandSender,
pub input_sender: InputMessageSender,
}
impl ClientInput {
pub async fn send(
&self,
message: InputMessage,
) -> Result<(), tokio::sync::mpsc::error::SendError<InputMessage>> {
self.input_sender.send(message).await
}
}
#[derive(Clone)]
pub struct ClientOutput {
pub received_buffer_request_sender: ReceivedBufferRequestSender,
}
@@ -342,7 +342,6 @@ where
if let Poll::Ready(Some(id)) = Pin::new(&mut self.client_connection_rx).poll_next(cx) {
match id {
ConnectionCommand::Close(id) => self.on_close_connection(id),
ConnectionCommand::ActiveConnections(_) => panic!(),
}
}
@@ -421,7 +420,6 @@ where
if let Poll::Ready(Some(id)) = Pin::new(&mut self.client_connection_rx).poll_next(cx) {
match id {
ConnectionCommand::Close(id) => self.on_close_connection(id),
ConnectionCommand::ActiveConnections(_) => panic!(),
}
}
+2 -2
View File
@@ -10,7 +10,7 @@ use client_core::client::{
};
use log::*;
use nymsphinx::addressing::clients::Recipient;
use proxy_helpers::connection_controller::{BroadcastActiveConnections, Controller};
use proxy_helpers::connection_controller::Controller;
use std::net::SocketAddr;
use tap::TapFallible;
use task::TaskClient;
@@ -69,7 +69,7 @@ impl SphinxSocksServer {
// controller for managing all active connections
let (mut active_streams_controller, controller_sender) = Controller::new(
client_connection_tx,
BroadcastActiveConnections::Off,
//BroadcastActiveConnections::Off,
self.shutdown.clone(),
);
tokio::spawn(async move {
-6
View File
@@ -25,12 +25,6 @@ pub enum ConnectionCommand {
// Announce that at a connection was closed. E.g the `OutQueueControl` uses this to discard
// transmission lanes.
Close(ConnectionId),
// In the network requester for example, we usually want to broadcast active connections
// regularly, so we know what connections we need to request lane queue lengths for from the
// client.
// In the socks5-client, this is not needed since have direct access to the lane queue lengths.
ActiveConnections(Vec<ConnectionId>),
}
// The `OutQueueControl` publishes the backlog per lane, primarily so that upstream can slow down
@@ -7,12 +7,8 @@ use futures::StreamExt;
use log::*;
use ordered_buffer::{OrderedMessage, OrderedMessageBuffer, ReadContiguousData};
use socks5_requests::ConnectionId;
use std::{
collections::{HashMap, HashSet},
time::Duration,
};
use std::collections::{HashMap, HashSet};
use task::TaskClient;
use tokio::time;
/// A generic message produced after reading from a socket/connection. It includes data that was
/// actually read alongside boolean indicating whether the connection got closed so that
@@ -87,10 +83,6 @@ pub struct Controller {
// Broadcast closed connections
client_connection_tx: ConnectionCommandSender,
// The controller can broadcast active connections. This is useful in the network-requester
// where its used to query the client for lane queue lengths
broadcast_connections: BroadcastActiveConnections,
// TODO: this can potentially be abused to ddos and kill provider. Not sure at this point
// how to handle it more gracefully
@@ -104,7 +96,6 @@ pub struct Controller {
impl Controller {
pub fn new(
client_connection_tx: ConnectionCommandSender,
broadcast_connections: BroadcastActiveConnections,
shutdown: TaskClient,
) -> (Self, ControllerSender) {
let (sender, receiver) = mpsc::unbounded();
@@ -114,7 +105,6 @@ impl Controller {
receiver,
recently_closed: HashSet::new(),
client_connection_tx,
broadcast_connections,
pending_messages: HashMap::new(),
shutdown,
},
@@ -165,15 +155,6 @@ impl Controller {
}
}
fn broadcast_active_connections(&mut self) {
// What about the recently closed ones? Hopefully we can ignore them ...
let conn_ids = self.active_connections.keys().copied().collect();
self.client_connection_tx
.unbounded_send(ConnectionCommand::ActiveConnections(conn_ids))
.unwrap();
}
fn send_to_connection(&mut self, conn_id: ConnectionId, payload: Vec<u8>, is_closed: bool) {
if let Some(active_connection) = self.active_connections.get_mut(&conn_id) {
if !payload.is_empty() {
@@ -230,8 +211,6 @@ impl Controller {
}
pub async fn run(&mut self) {
let mut interval = time::interval(Duration::from_millis(500));
loop {
tokio::select! {
command = self.receiver.next() => match command {
@@ -247,11 +226,6 @@ impl Controller {
break;
}
},
_ = interval.tick() => {
if self.broadcast_connections == BroadcastActiveConnections::On {
self.broadcast_active_connections();
}
},
}
}
self.shutdown.recv_timeout().await;
+13 -3
View File
@@ -59,7 +59,7 @@ pub(crate) mod tests {
use coconut_dkg_common::dealer::DealerDetails;
use cosmwasm_std::Addr;
use dkg::bte::keys::KeyPair as DkgKeyPair;
use dkg::bte::Params;
use dkg::bte::{Params, PublicKeyWithProof};
use rand::rngs::OsRng;
use std::collections::HashMap;
use std::path::PathBuf;
@@ -186,8 +186,18 @@ pub(crate) mod tests {
let mut bytes = bs58::decode(details.bte_public_key_with_proof.clone())
.into_vec()
.unwrap();
let last_byte = bytes.last_mut().unwrap();
*last_byte += 1;
// Find another value for last byte that still deserializes to a public key with proof
let initial_byte = *bytes.last_mut().unwrap();
loop {
let last_byte = bytes.last_mut().unwrap();
let (ret, _) = last_byte.overflowing_add(1);
*last_byte = ret;
// stop when we find that value, or if we do a full round trip of u8 values
// and can't find one, in which case this test is invalid
if PublicKeyWithProof::try_from_bytes(&bytes).is_ok() || ret == initial_byte {
break;
}
}
details.bte_public_key_with_proof = bs58::encode(&bytes).into_string();
});
+6 -6
View File
@@ -1312,9 +1312,9 @@ hosted-git-info@^2.1.4:
integrity sha512-mxIDAb9Lsm6DoOJ7xH+5+X4y1LU/4Hi50L9C5sIswK3JzULS4bwk1FvjdBgvYR4bzT4tuUQiC15FE2f5HbLvYw==
http-cache-semantics@^4.0.0:
version "4.1.0"
resolved "https://registry.yarnpkg.com/http-cache-semantics/-/http-cache-semantics-4.1.0.tgz#49e91c5cbf36c9b94bcfcd71c23d5249ec74e390"
integrity sha512-carPklcUh7ROWRK7Cv27RPtdhYhUsela/ue5/jKzjegVvXDqM2ILE9Q2BGn9JZJh1g87cp56su/FgQSzcWS8cQ==
version "4.1.1"
resolved "https://registry.yarnpkg.com/http-cache-semantics/-/http-cache-semantics-4.1.1.tgz#abe02fcb2985460bf0323be664436ec3476a6d5a"
integrity sha512-er295DKPVsV82j5kw1Gjt+ADA/XYHsajl82cGNQG2eyoPkvgUhX+nDIyelzhIWbbsXP39EHcI6l5tYs2FYqYXQ==
http2-wrapper@^1.0.0-beta.5.2:
version "1.0.3"
@@ -2471,9 +2471,9 @@ type-fest@^0.21.3:
integrity sha512-t0rzBq87m3fVcduHDUFhKmyyX+9eo6WQjZvf51Ea/M0Q7+T374Jp1aUiyUl0GKxp8M/OETVHSDvmkyPgvX+X2w==
ua-parser-js@^0.7.21:
version "0.7.28"
resolved "https://registry.yarnpkg.com/ua-parser-js/-/ua-parser-js-0.7.28.tgz#8ba04e653f35ce210239c64661685bf9121dec31"
integrity sha512-6Gurc1n//gjp9eQNXjD9O3M/sMwVtN5S8Lv9bvOYBfKfDNiIIhqiyi01vMBO45u4zkDE420w/e0se7Vs+sIg+g==
version "0.7.33"
resolved "https://registry.yarnpkg.com/ua-parser-js/-/ua-parser-js-0.7.33.tgz#1d04acb4ccef9293df6f70f2c3d22f3030d8b532"
integrity sha512-s8ax/CeZdK9R/56Sui0WM6y9OFREJarMRHqLB2EwkovemBxNQ+Bqu8GAsUnVcXKgphb++ghr/B2BZx4mahujPw==
unbzip2-stream@1.3.3:
version "1.3.3"
+2 -2
View File
@@ -36,8 +36,8 @@ mod connection_state;
mod keys;
mod paths;
pub use client::{MixnetClient, MixnetClientBuilder};
pub use client_core::config::GatewayEndpointConfig;
pub use client::{MixnetClient, MixnetClientBuilder, MixnetClientSender};
pub use client_core::{client::inbound_messages::InputMessage, config::GatewayEndpointConfig};
pub use config::Config;
pub use keys::{Keys, KeysArc};
pub use nymsphinx::{
+36 -9
View File
@@ -268,7 +268,6 @@ pub struct MixnetClient {
/// The current state of the client that is exposed to the user. This includes things like
/// current message send queue length.
#[allow(dead_code)]
client_state: ClientState,
/// A channel for messages arriving from the mixnet after they have been reconstructed.
@@ -421,6 +420,23 @@ impl MixnetClient {
&self.nym_address
}
/// Get a shallow clone of [`MixnetClientSender`]
pub fn sender(&self) -> MixnetClientSender {
MixnetClientSender {
client_input: self.client_input.clone(),
}
}
/// Get a shallow clone of [`ConnectionCommandSender`].
pub fn connection_command_sender(&self) -> client_connections::ConnectionCommandSender {
self.client_input.connection_command_sender.clone()
}
/// Get a shallow clone of [`LaneQueueLengths`].
pub fn shared_lane_queue_lengths(&self) -> client_connections::LaneQueueLengths {
self.client_state.shared_lane_queue_lengths.clone()
}
/// Sends stringy data to the supplied Nym address
pub async fn send_str(&self, address: Recipient, message: &str) {
let message_bytes = message.to_string().into_bytes();
@@ -430,7 +446,7 @@ impl MixnetClient {
/// Sends stringy data to the supplied Nym address, and skip sending reply-SURBs
pub async fn send_str_direct(&self, address: Recipient, message: &str) {
let message_bytes = message.to_string().into_bytes();
self.send_bytes(address, message_bytes).await;
self.send_bytes_direct(address, message_bytes).await;
}
/// Sends bytes to the supplied Nym address
@@ -451,13 +467,12 @@ impl MixnetClient {
pub async fn send_bytes(&self, address: Recipient, message: Vec<u8>) {
let lane = TransmissionLane::General;
let input_msg = InputMessage::new_anonymous(address, message, 20, lane);
if self
.client_input
.input_sender
.send(input_msg)
.await
.is_err()
{
self.send_input_message(input_msg).await
}
/// Sends a [`InputMessage`] to the mixnet.
async fn send_input_message(&self, message: InputMessage) {
if self.client_input.send(message).await.is_err() {
log::error!("Failed to send message");
}
}
@@ -501,3 +516,15 @@ impl MixnetClient {
self.task_manager.wait_for_shutdown().await;
}
}
pub struct MixnetClientSender {
client_input: ClientInput,
}
impl MixnetClientSender {
pub async fn send_input_message(&mut self, message: InputMessage) {
if self.client_input.send(message).await.is_err() {
log::error!("Failed to send message");
}
}
}
@@ -31,9 +31,10 @@ tokio-tungstenite = "0.17.2"
# internal
client-connections = { path = "../../common/client-connections" }
completions = { path = "../../common/completions" }
network-defaults = { path = "../../common/network-defaults" }
nymsphinx = { path = "../../common/nymsphinx" }
logging = { path = "../../common/logging"}
network-defaults = { path = "../../common/network-defaults" }
nym-sdk = { path = "../../sdk/rust/nym-sdk" }
nymsphinx = { path = "../../common/nymsphinx" }
ordered-buffer = {path = "../../common/socks5/ordered-buffer"}
proxy-helpers = { path = "../../common/socks5/proxy-helpers" }
socks5-requests = { path = "../../common/socks5/requests" }
+28 -158
View File
@@ -4,22 +4,16 @@ use crate::allowed_hosts;
use crate::allowed_hosts::OutboundRequestFilter;
use crate::error::NetworkRequesterError;
use crate::statistics::ServiceStatisticsCollector;
use crate::websocket;
use crate::websocket::TSWebsocketStream;
use crate::{reply, socks5};
use client_connections::{
ConnectionCommand, ConnectionCommandReceiver, LaneQueueLengths, TransmissionLane,
};
use client_connections::LaneQueueLengths;
use futures::channel::mpsc;
use futures::stream::{SplitSink, SplitStream};
use futures::{SinkExt, StreamExt};
use nymsphinx::addressing::clients::Recipient;
use nymsphinx::anonymous_replies::requests::AnonymousSenderTag;
use nymsphinx::receiver::ReconstructedMessage;
use proxy_helpers::connection_controller::{
BroadcastActiveConnections, Controller, ControllerCommand, ControllerSender,
use proxy_helpers::{
connection_controller::{Controller, ControllerCommand, ControllerSender},
proxy_runner::{MixProxyReader, MixProxySender},
};
use proxy_helpers::proxy_runner::{MixProxyReader, MixProxySender};
use socks5_requests::{
ConnectRequest, ConnectionId, Message as Socks5Message, NetworkRequesterResponse, Request,
Response,
@@ -28,14 +22,11 @@ use statistics_common::collector::StatisticsSender;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use task::TaskClient;
use tokio_tungstenite::tungstenite::protocol::Message;
use websocket_requests::{requests::ClientRequest, responses::ServerResponse};
// Since it's an atomic, it's safe to be kept static and shared across threads
static ACTIVE_PROXIES: AtomicUsize = AtomicUsize::new(0);
pub struct ServiceProvider {
websocket_address: String,
outbound_request_filter: OutboundRequestFilter,
open_proxy: bool,
enable_statistics: bool,
@@ -44,7 +35,6 @@ pub struct ServiceProvider {
impl ServiceProvider {
pub async fn new(
websocket_address: String,
open_proxy: bool,
enable_statistics: bool,
stats_provider_addr: Option<Recipient>,
@@ -67,7 +57,6 @@ impl ServiceProvider {
let outbound_request_filter = OutboundRequestFilter::new(allowed_hosts, unknown_hosts);
ServiceProvider {
websocket_address,
outbound_request_filter,
open_proxy,
enable_statistics,
@@ -78,10 +67,9 @@ impl ServiceProvider {
/// Listens for any messages from `mix_reader` that should be written back to the mix network
/// via the `websocket_writer`.
async fn mixnet_response_listener(
mut websocket_writer: SplitSink<TSWebsocketStream, Message>,
mut mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender,
mut mix_reader: MixProxyReader<(Socks5Message, reply::ReturnAddress)>,
stats_collector: Option<ServiceStatisticsCollector>,
mut client_connection_rx: ConnectionCommandReceiver,
) {
loop {
tokio::select! {
@@ -102,102 +90,19 @@ impl ServiceProvider {
}
}
// make 'request' to native-websocket client
let conn_id = msg.conn_id();
let response_message = return_address.send_back_to(msg.into_bytes(), conn_id);
let message = Message::Binary(response_message.serialize());
websocket_writer.send(message).await.unwrap();
mixnet_client_sender.send_input_message(response_message).await;
} else {
log::error!("Exiting: channel closed!");
break;
}
},
Some(command) = client_connection_rx.next() => {
match command {
ConnectionCommand::Close(id) => {
let msg = ClientRequest::ClosedConnection(id);
let ws_msg = Message::Binary(msg.serialize());
websocket_writer.send(ws_msg).await.unwrap();
}
ConnectionCommand::ActiveConnections(ids) => {
// We can optimize this by sending a single request, but this is
// usually in the low single digits, max a few tens, so we leave that
// for a rainy day.
// Also that means fiddling with the currently manual
// serialize/deserialize we do with ClientRequests ...
for id in ids {
log::trace!("Requesting lane queue length for: {}", id);
let msg = ClientRequest::GetLaneQueueLength(id);
let ws_msg = Message::Binary(msg.serialize());
websocket_writer.send(ws_msg).await.unwrap();
}
}
}
},
}
}
}
fn handle_lane_queue_length_response(
lane_queue_lengths: &LaneQueueLengths,
lane: u64,
queue_length: usize,
) {
log::trace!("Received LaneQueueLength lane: {lane}, queue_length: {queue_length}");
if let Ok(mut lane_queue_lengths) = lane_queue_lengths.lock() {
let lane = TransmissionLane::ConnectionId(lane);
lane_queue_lengths.map.insert(lane, queue_length);
} else {
log::warn!("Unable to lock lane queue lengths, skipping updating received lane length")
}
}
async fn read_websocket_message(
websocket_reader: &mut SplitStream<TSWebsocketStream>,
lane_queue_lengths: LaneQueueLengths,
) -> Option<ReconstructedMessage> {
while let Some(msg) = websocket_reader.next().await {
let data = match msg {
Ok(msg) => msg.into_data(),
Err(err) => {
log::error!("Failed to read from the websocket: {err}");
continue;
}
};
// try to recover the actual message from the mix network...
let deserialized_message = match ServerResponse::deserialize(&data) {
Ok(deserialized) => deserialized,
Err(err) => {
log::error!(
"Failed to deserialize received websocket message! - {}",
err
);
continue;
}
};
let received = match deserialized_message {
ServerResponse::Received(received) => received,
ServerResponse::LaneQueueLength { lane, queue_length } => {
Self::handle_lane_queue_length_response(
&lane_queue_lengths,
lane,
queue_length,
);
continue;
}
ServerResponse::Error(err) => {
panic!("received error from native client! - {err}")
}
_ => unimplemented!("probably should never be reached?"),
};
return Some(received);
}
None
}
async fn start_proxy(
conn_id: ConnectionId,
remote_addr: String,
@@ -395,10 +300,8 @@ impl ServiceProvider {
/// Start all subsystems
pub async fn run(&mut self) -> Result<(), NetworkRequesterError> {
let websocket_stream = self.connect_websocket(&self.websocket_address).await?;
// split the websocket so that we could read and write from separate threads
let (websocket_writer, mut websocket_reader) = websocket_stream.split();
// Connect to the mixnet
let mut mixnet_client = nym_sdk::mixnet::MixnetClient::connect().await.unwrap();
// channels responsible for managing messages that are to be sent to the mix network. The receiver is
// going to be used by `mixnet_response_listener`
@@ -408,21 +311,9 @@ impl ServiceProvider {
// Used to notify tasks to shutdown. Not all tasks fully supports this (yet).
let shutdown = task::TaskManager::default();
// Channel for announcing client connection state by the controller.
// The `mixnet_response_listener` will use this to either report closed connection to the
// client or request lane queue lengths.
let (client_connection_tx, client_connection_rx) = mpsc::unbounded();
// Shared queue length data. Published by the `OutQueueController` in the client, and used
// primarily to throttle incoming connections
let shared_lane_queue_lengths = LaneQueueLengths::new();
// Controller for managing all active connections.
// We provide it with a ShutdownListener since it requires it, even though for the network
// requester shutdown signalling is not yet fully implemented.
let (mut active_connections_controller, mut controller_sender) = Controller::new(
client_connection_tx,
BroadcastActiveConnections::On,
mixnet_client.connection_command_sender(),
shutdown.subscribe(),
);
@@ -446,58 +337,37 @@ impl ServiceProvider {
};
let stats_collector_clone = stats_collector.clone();
let mixnet_client_sender = mixnet_client.sender();
// start the listener for mix messages
tokio::spawn(async move {
Self::mixnet_response_listener(
websocket_writer,
mixnet_client_sender,
mix_input_receiver,
stats_collector_clone,
client_connection_rx,
)
.await;
});
let nym_address = mixnet_client.nym_address();
log::info!("Our nym address is: {nym_address}");
log::info!("All systems go. Press CTRL-C to stop the server.");
// for each incoming message from the websocket... (which in 99.99% cases is going to be a mix message)
loop {
let Some(received) = Self::read_websocket_message(
&mut websocket_reader,
shared_lane_queue_lengths.clone()
while let Some(received) = mixnet_client.wait_for_messages().await {
for received in received {
self.handle_proxy_message(
received,
&mut controller_sender,
&mix_input_sender,
mixnet_client.shared_lane_queue_lengths(),
stats_collector.clone(),
shutdown.subscribe(),
)
.await
else {
log::error!("The websocket stream has finished!");
return Err(NetworkRequesterError::ConnectionClosed);
};
self.handle_proxy_message(
received,
&mut controller_sender,
&mix_input_sender,
shared_lane_queue_lengths.clone(),
stats_collector.clone(),
shutdown.subscribe(),
)
.await;
}
}
// Make the websocket connection so we can receive incoming Mixnet messages.
async fn connect_websocket(
&self,
uri: &str,
) -> Result<TSWebsocketStream, NetworkRequesterError> {
match websocket::Connection::new(uri).connect().await {
Ok(ws_stream) => {
log::info!("* connected to local websocket server at {}", uri);
Ok(ws_stream)
}
Err(err) => {
log::error!(
"Error: websocket connection attempt failed, is the Nym client running?"
);
Err(err.into())
.await;
}
}
log::error!("Network requester exited unexpectedly");
Ok(())
}
}
@@ -1,13 +1,5 @@
use crate::websocket::WebsocketConnectionError;
#[derive(thiserror::Error, Debug)]
pub enum NetworkRequesterError {
#[error("I/O error: {0}")]
IoError(#[from] std::io::Error),
#[error("Websocket error")]
WebsocketConnectionError(#[from] WebsocketConnectionError),
#[error("Websocket connection closed")]
ConnectionClosed,
}
@@ -16,7 +16,6 @@ mod error;
mod reply;
mod socks5;
mod statistics;
mod websocket;
const ENABLE_STATISTICS: &str = "enable-statistics";
@@ -56,16 +55,8 @@ impl Run {
.transpose()
.unwrap_or(None);
let websocket_address = format!(
"ws://localhost:{}",
self.websocket_port
.as_ref()
.unwrap_or(&network_defaults::DEFAULT_WEBSOCKET_LISTENING_PORT.to_string())
);
log::info!("Starting socks5 service provider");
let mut server = core::ServiceProvider::new(
websocket_address,
self.open_proxy,
self.enable_statistics,
stats_provider_addr,
@@ -1,6 +1,7 @@
use client_connections::TransmissionLane;
use nym_sdk::mixnet::InputMessage;
use nymsphinx::addressing::clients::Recipient;
use nymsphinx::anonymous_replies::requests::AnonymousSenderTag;
use websocket_requests::requests::ClientRequest;
/// A return address is a way to send a message back to the original sender. It can be either
/// an explicitly known Recipient, or a surb AnonymousSenderTag.
@@ -24,17 +25,17 @@ impl ReturnAddress {
None
}
pub(super) fn send_back_to(self, message: Vec<u8>, connection_id: u64) -> ClientRequest {
pub(super) fn send_back_to(self, message: Vec<u8>, connection_id: u64) -> InputMessage {
match self {
ReturnAddress::Known(recipient) => ClientRequest::Send {
ReturnAddress::Known(recipient) => InputMessage::Regular {
recipient: *recipient,
message,
connection_id: Some(connection_id),
data: message,
lane: TransmissionLane::ConnectionId(connection_id),
},
ReturnAddress::Anonymous(sender_tag) => ClientRequest::Reply {
message,
sender_tag,
connection_id: Some(connection_id),
ReturnAddress::Anonymous(sender_tag) => InputMessage::Reply {
recipient_tag: sender_tag,
data: message,
lane: TransmissionLane::ConnectionId(connection_id),
},
}
}
@@ -1,34 +0,0 @@
// Copyright 2020 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use tokio::net::TcpStream;
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::{connect_async, MaybeTlsStream};
#[allow(clippy::upper_case_acronyms)]
pub(crate) type TSWebsocketStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
pub struct Connection {
uri: String,
}
impl Connection {
pub fn new(uri: &str) -> Connection {
Connection {
uri: String::from(uri),
}
}
pub async fn connect(&self) -> Result<TSWebsocketStream, WebsocketConnectionError> {
match connect_async(&self.uri).await {
Ok((ws_stream, _)) => Ok(ws_stream),
Err(e) => Err(WebsocketConnectionError::ConnectionNotEstablished(e)),
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum WebsocketConnectionError {
#[error("Connection not established")]
ConnectionNotEstablished(tokio_tungstenite::tungstenite::Error),
}
+3 -3
View File
@@ -10855,9 +10855,9 @@ htmlparser2@^6.1.0:
entities "^2.0.0"
http-cache-semantics@^4.1.0:
version "4.1.0"
resolved "https://registry.yarnpkg.com/http-cache-semantics/-/http-cache-semantics-4.1.0.tgz#49e91c5cbf36c9b94bcfcd71c23d5249ec74e390"
integrity sha512-carPklcUh7ROWRK7Cv27RPtdhYhUsela/ue5/jKzjegVvXDqM2ILE9Q2BGn9JZJh1g87cp56su/FgQSzcWS8cQ==
version "4.1.1"
resolved "https://registry.yarnpkg.com/http-cache-semantics/-/http-cache-semantics-4.1.1.tgz#abe02fcb2985460bf0323be664436ec3476a6d5a"
integrity sha512-er295DKPVsV82j5kw1Gjt+ADA/XYHsajl82cGNQG2eyoPkvgUhX+nDIyelzhIWbbsXP39EHcI6l5tYs2FYqYXQ==
http-deceiver@^1.2.7:
version "1.2.7"