Compare commits
17 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a9428d4f7b | |||
| 067ee0db74 | |||
| e85974d9a5 | |||
| 000100852b | |||
| f66110eaa9 | |||
| 78aa07360c | |||
| 4e52478c7a | |||
| 7c77665a37 | |||
| 72880b9764 | |||
| fd1c0ae62b | |||
| bd7399091b | |||
| 00fad44e2e | |||
| ea33c332ee | |||
| b39f8af8d0 | |||
| a400463b7e | |||
| 82928af64c | |||
| c5891f546c |
@@ -0,0 +1,113 @@
|
||||
name: Build and upload binaries to CI
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
push:
|
||||
paths:
|
||||
- 'clients/**'
|
||||
- 'common/**'
|
||||
- 'contracts/**'
|
||||
- 'explorer-api/**'
|
||||
- 'gateway/**'
|
||||
- 'integrations/**'
|
||||
- 'mixnode/**'
|
||||
- 'sdk/rust/nym-sdk/**'
|
||||
- 'service-providers/**'
|
||||
- 'nym-api/**'
|
||||
- 'nym-outfox/**'
|
||||
- 'tools/nym-cli/**'
|
||||
- 'tools/ts-rs-cli/**'
|
||||
pull_request:
|
||||
paths:
|
||||
- 'clients/**'
|
||||
- 'common/**'
|
||||
- 'contracts/**'
|
||||
- 'explorer-api/**'
|
||||
- 'gateway/**'
|
||||
- 'integrations/**'
|
||||
- 'mixnode/**'
|
||||
- 'sdk/rust/nym-sdk/**'
|
||||
- 'service-providers/**'
|
||||
- 'nym-api/**'
|
||||
- 'nym-outfox/**'
|
||||
- 'tools/nym-cli/**'
|
||||
- 'tools/ts-rs-cli/**'
|
||||
|
||||
env:
|
||||
NETWORK: mainnet
|
||||
|
||||
jobs:
|
||||
publish-nym:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
platform: [ubuntu-20.04]
|
||||
|
||||
runs-on: ${{ matrix.platform }}
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
- name: Prepare build output directory
|
||||
shell: bash
|
||||
env:
|
||||
OUTPUT_DIR: ci-builds/${{ github.ref_name }}
|
||||
run: |
|
||||
rm -rf ci-builds || true
|
||||
mkdir -p $OUTPUT_DIR
|
||||
echo $OUTPUT_DIR
|
||||
|
||||
- name: Install Dependencies (Linux)
|
||||
run: sudo apt-get update && sudo apt-get -y install libwebkit2gtk-4.0-dev build-essential curl wget libssl-dev libgtk-3-dev libudev-dev squashfs-tools
|
||||
continue-on-error: true
|
||||
|
||||
- name: Install Rust stable
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: stable
|
||||
|
||||
- name: Build all binaries
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: build
|
||||
args: --workspace --release
|
||||
|
||||
- name: Install Rust stable
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: stable
|
||||
target: wasm32-unknown-unknown
|
||||
override: true
|
||||
components: rustfmt, clippy
|
||||
|
||||
- name: Build release contracts
|
||||
run: make wasm
|
||||
|
||||
- name: Prepare build output
|
||||
shell: bash
|
||||
env:
|
||||
OUTPUT_DIR: ci-builds/${{ github.ref_name }}
|
||||
run: |
|
||||
cp target/release/nym-client $OUTPUT_DIR
|
||||
cp target/release/nym-gateway $OUTPUT_DIR
|
||||
cp target/release/nym-mixnode $OUTPUT_DIR
|
||||
cp target/release/nym-socks5-client $OUTPUT_DIR
|
||||
cp target/release/nym-api $OUTPUT_DIR
|
||||
cp target/release/nym-network-requester $OUTPUT_DIR
|
||||
cp target/release/nym-network-statistics $OUTPUT_DIR
|
||||
cp target/release/nym-cli $OUTPUT_DIR
|
||||
|
||||
cp contracts/target/wasm32-unknown-unknown/release/mixnet_contract.wasm $OUTPUT_DIR
|
||||
cp contracts/target/wasm32-unknown-unknown/release/vesting_contract.wasm $OUTPUT_DIR
|
||||
|
||||
- name: Deploy branch to CI www
|
||||
continue-on-error: true
|
||||
uses: easingthemes/ssh-deploy@main
|
||||
env:
|
||||
SSH_PRIVATE_KEY: ${{ secrets.CI_WWW_SSH_PRIVATE_KEY }}
|
||||
ARGS: "-avzr"
|
||||
SOURCE: "ci-builds/"
|
||||
REMOTE_HOST: ${{ secrets.CI_WWW_REMOTE_HOST }}
|
||||
REMOTE_USER: ${{ secrets.CI_WWW_REMOTE_USER }}
|
||||
TARGET: ${{ secrets.CI_WWW_REMOTE_TARGET }}/builds/
|
||||
EXCLUDE: "/dist/, /node_modules/"
|
||||
|
||||
Generated
+1
@@ -3743,6 +3743,7 @@ dependencies = [
|
||||
"log",
|
||||
"logging",
|
||||
"network-defaults",
|
||||
"nym-sdk",
|
||||
"nymsphinx",
|
||||
"ordered-buffer",
|
||||
"pretty_env_logger",
|
||||
|
||||
@@ -51,11 +51,22 @@ pub mod non_wasm_helpers;
|
||||
|
||||
pub mod helpers;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ClientInput {
|
||||
pub connection_command_sender: ConnectionCommandSender,
|
||||
pub input_sender: InputMessageSender,
|
||||
}
|
||||
|
||||
impl ClientInput {
|
||||
pub async fn send(
|
||||
&self,
|
||||
message: InputMessage,
|
||||
) -> Result<(), tokio::sync::mpsc::error::SendError<InputMessage>> {
|
||||
self.input_sender.send(message).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ClientOutput {
|
||||
pub received_buffer_request_sender: ReceivedBufferRequestSender,
|
||||
}
|
||||
|
||||
@@ -342,7 +342,6 @@ where
|
||||
if let Poll::Ready(Some(id)) = Pin::new(&mut self.client_connection_rx).poll_next(cx) {
|
||||
match id {
|
||||
ConnectionCommand::Close(id) => self.on_close_connection(id),
|
||||
ConnectionCommand::ActiveConnections(_) => panic!(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -421,7 +420,6 @@ where
|
||||
if let Poll::Ready(Some(id)) = Pin::new(&mut self.client_connection_rx).poll_next(cx) {
|
||||
match id {
|
||||
ConnectionCommand::Close(id) => self.on_close_connection(id),
|
||||
ConnectionCommand::ActiveConnections(_) => panic!(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ use client_core::client::{
|
||||
};
|
||||
use log::*;
|
||||
use nymsphinx::addressing::clients::Recipient;
|
||||
use proxy_helpers::connection_controller::{BroadcastActiveConnections, Controller};
|
||||
use proxy_helpers::connection_controller::Controller;
|
||||
use std::net::SocketAddr;
|
||||
use tap::TapFallible;
|
||||
use task::TaskClient;
|
||||
@@ -69,7 +69,7 @@ impl SphinxSocksServer {
|
||||
// controller for managing all active connections
|
||||
let (mut active_streams_controller, controller_sender) = Controller::new(
|
||||
client_connection_tx,
|
||||
BroadcastActiveConnections::Off,
|
||||
//BroadcastActiveConnections::Off,
|
||||
self.shutdown.clone(),
|
||||
);
|
||||
tokio::spawn(async move {
|
||||
|
||||
@@ -25,12 +25,6 @@ pub enum ConnectionCommand {
|
||||
// Announce that at a connection was closed. E.g the `OutQueueControl` uses this to discard
|
||||
// transmission lanes.
|
||||
Close(ConnectionId),
|
||||
|
||||
// In the network requester for example, we usually want to broadcast active connections
|
||||
// regularly, so we know what connections we need to request lane queue lengths for from the
|
||||
// client.
|
||||
// In the socks5-client, this is not needed since have direct access to the lane queue lengths.
|
||||
ActiveConnections(Vec<ConnectionId>),
|
||||
}
|
||||
|
||||
// The `OutQueueControl` publishes the backlog per lane, primarily so that upstream can slow down
|
||||
|
||||
@@ -7,12 +7,8 @@ use futures::StreamExt;
|
||||
use log::*;
|
||||
use ordered_buffer::{OrderedMessage, OrderedMessageBuffer, ReadContiguousData};
|
||||
use socks5_requests::ConnectionId;
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
time::Duration,
|
||||
};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use task::TaskClient;
|
||||
use tokio::time;
|
||||
|
||||
/// A generic message produced after reading from a socket/connection. It includes data that was
|
||||
/// actually read alongside boolean indicating whether the connection got closed so that
|
||||
@@ -87,10 +83,6 @@ pub struct Controller {
|
||||
// Broadcast closed connections
|
||||
client_connection_tx: ConnectionCommandSender,
|
||||
|
||||
// The controller can broadcast active connections. This is useful in the network-requester
|
||||
// where its used to query the client for lane queue lengths
|
||||
broadcast_connections: BroadcastActiveConnections,
|
||||
|
||||
// TODO: this can potentially be abused to ddos and kill provider. Not sure at this point
|
||||
// how to handle it more gracefully
|
||||
|
||||
@@ -104,7 +96,6 @@ pub struct Controller {
|
||||
impl Controller {
|
||||
pub fn new(
|
||||
client_connection_tx: ConnectionCommandSender,
|
||||
broadcast_connections: BroadcastActiveConnections,
|
||||
shutdown: TaskClient,
|
||||
) -> (Self, ControllerSender) {
|
||||
let (sender, receiver) = mpsc::unbounded();
|
||||
@@ -114,7 +105,6 @@ impl Controller {
|
||||
receiver,
|
||||
recently_closed: HashSet::new(),
|
||||
client_connection_tx,
|
||||
broadcast_connections,
|
||||
pending_messages: HashMap::new(),
|
||||
shutdown,
|
||||
},
|
||||
@@ -165,15 +155,6 @@ impl Controller {
|
||||
}
|
||||
}
|
||||
|
||||
fn broadcast_active_connections(&mut self) {
|
||||
// What about the recently closed ones? Hopefully we can ignore them ...
|
||||
let conn_ids = self.active_connections.keys().copied().collect();
|
||||
|
||||
self.client_connection_tx
|
||||
.unbounded_send(ConnectionCommand::ActiveConnections(conn_ids))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn send_to_connection(&mut self, conn_id: ConnectionId, payload: Vec<u8>, is_closed: bool) {
|
||||
if let Some(active_connection) = self.active_connections.get_mut(&conn_id) {
|
||||
if !payload.is_empty() {
|
||||
@@ -230,8 +211,6 @@ impl Controller {
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) {
|
||||
let mut interval = time::interval(Duration::from_millis(500));
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
command = self.receiver.next() => match command {
|
||||
@@ -247,11 +226,6 @@ impl Controller {
|
||||
break;
|
||||
}
|
||||
},
|
||||
_ = interval.tick() => {
|
||||
if self.broadcast_connections == BroadcastActiveConnections::On {
|
||||
self.broadcast_active_connections();
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
self.shutdown.recv_timeout().await;
|
||||
|
||||
@@ -59,7 +59,7 @@ pub(crate) mod tests {
|
||||
use coconut_dkg_common::dealer::DealerDetails;
|
||||
use cosmwasm_std::Addr;
|
||||
use dkg::bte::keys::KeyPair as DkgKeyPair;
|
||||
use dkg::bte::Params;
|
||||
use dkg::bte::{Params, PublicKeyWithProof};
|
||||
use rand::rngs::OsRng;
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
@@ -186,8 +186,18 @@ pub(crate) mod tests {
|
||||
let mut bytes = bs58::decode(details.bte_public_key_with_proof.clone())
|
||||
.into_vec()
|
||||
.unwrap();
|
||||
let last_byte = bytes.last_mut().unwrap();
|
||||
*last_byte += 1;
|
||||
// Find another value for last byte that still deserializes to a public key with proof
|
||||
let initial_byte = *bytes.last_mut().unwrap();
|
||||
loop {
|
||||
let last_byte = bytes.last_mut().unwrap();
|
||||
let (ret, _) = last_byte.overflowing_add(1);
|
||||
*last_byte = ret;
|
||||
// stop when we find that value, or if we do a full round trip of u8 values
|
||||
// and can't find one, in which case this test is invalid
|
||||
if PublicKeyWithProof::try_from_bytes(&bytes).is_ok() || ret == initial_byte {
|
||||
break;
|
||||
}
|
||||
}
|
||||
details.bte_public_key_with_proof = bs58::encode(&bytes).into_string();
|
||||
});
|
||||
|
||||
|
||||
@@ -1312,9 +1312,9 @@ hosted-git-info@^2.1.4:
|
||||
integrity sha512-mxIDAb9Lsm6DoOJ7xH+5+X4y1LU/4Hi50L9C5sIswK3JzULS4bwk1FvjdBgvYR4bzT4tuUQiC15FE2f5HbLvYw==
|
||||
|
||||
http-cache-semantics@^4.0.0:
|
||||
version "4.1.0"
|
||||
resolved "https://registry.yarnpkg.com/http-cache-semantics/-/http-cache-semantics-4.1.0.tgz#49e91c5cbf36c9b94bcfcd71c23d5249ec74e390"
|
||||
integrity sha512-carPklcUh7ROWRK7Cv27RPtdhYhUsela/ue5/jKzjegVvXDqM2ILE9Q2BGn9JZJh1g87cp56su/FgQSzcWS8cQ==
|
||||
version "4.1.1"
|
||||
resolved "https://registry.yarnpkg.com/http-cache-semantics/-/http-cache-semantics-4.1.1.tgz#abe02fcb2985460bf0323be664436ec3476a6d5a"
|
||||
integrity sha512-er295DKPVsV82j5kw1Gjt+ADA/XYHsajl82cGNQG2eyoPkvgUhX+nDIyelzhIWbbsXP39EHcI6l5tYs2FYqYXQ==
|
||||
|
||||
http2-wrapper@^1.0.0-beta.5.2:
|
||||
version "1.0.3"
|
||||
@@ -2471,9 +2471,9 @@ type-fest@^0.21.3:
|
||||
integrity sha512-t0rzBq87m3fVcduHDUFhKmyyX+9eo6WQjZvf51Ea/M0Q7+T374Jp1aUiyUl0GKxp8M/OETVHSDvmkyPgvX+X2w==
|
||||
|
||||
ua-parser-js@^0.7.21:
|
||||
version "0.7.28"
|
||||
resolved "https://registry.yarnpkg.com/ua-parser-js/-/ua-parser-js-0.7.28.tgz#8ba04e653f35ce210239c64661685bf9121dec31"
|
||||
integrity sha512-6Gurc1n//gjp9eQNXjD9O3M/sMwVtN5S8Lv9bvOYBfKfDNiIIhqiyi01vMBO45u4zkDE420w/e0se7Vs+sIg+g==
|
||||
version "0.7.33"
|
||||
resolved "https://registry.yarnpkg.com/ua-parser-js/-/ua-parser-js-0.7.33.tgz#1d04acb4ccef9293df6f70f2c3d22f3030d8b532"
|
||||
integrity sha512-s8ax/CeZdK9R/56Sui0WM6y9OFREJarMRHqLB2EwkovemBxNQ+Bqu8GAsUnVcXKgphb++ghr/B2BZx4mahujPw==
|
||||
|
||||
unbzip2-stream@1.3.3:
|
||||
version "1.3.3"
|
||||
|
||||
@@ -36,8 +36,8 @@ mod connection_state;
|
||||
mod keys;
|
||||
mod paths;
|
||||
|
||||
pub use client::{MixnetClient, MixnetClientBuilder};
|
||||
pub use client_core::config::GatewayEndpointConfig;
|
||||
pub use client::{MixnetClient, MixnetClientBuilder, MixnetClientSender};
|
||||
pub use client_core::{client::inbound_messages::InputMessage, config::GatewayEndpointConfig};
|
||||
pub use config::Config;
|
||||
pub use keys::{Keys, KeysArc};
|
||||
pub use nymsphinx::{
|
||||
|
||||
@@ -268,7 +268,6 @@ pub struct MixnetClient {
|
||||
|
||||
/// The current state of the client that is exposed to the user. This includes things like
|
||||
/// current message send queue length.
|
||||
#[allow(dead_code)]
|
||||
client_state: ClientState,
|
||||
|
||||
/// A channel for messages arriving from the mixnet after they have been reconstructed.
|
||||
@@ -421,6 +420,23 @@ impl MixnetClient {
|
||||
&self.nym_address
|
||||
}
|
||||
|
||||
/// Get a shallow clone of [`MixnetClientSender`]
|
||||
pub fn sender(&self) -> MixnetClientSender {
|
||||
MixnetClientSender {
|
||||
client_input: self.client_input.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a shallow clone of [`ConnectionCommandSender`].
|
||||
pub fn connection_command_sender(&self) -> client_connections::ConnectionCommandSender {
|
||||
self.client_input.connection_command_sender.clone()
|
||||
}
|
||||
|
||||
/// Get a shallow clone of [`LaneQueueLengths`].
|
||||
pub fn shared_lane_queue_lengths(&self) -> client_connections::LaneQueueLengths {
|
||||
self.client_state.shared_lane_queue_lengths.clone()
|
||||
}
|
||||
|
||||
/// Sends stringy data to the supplied Nym address
|
||||
pub async fn send_str(&self, address: Recipient, message: &str) {
|
||||
let message_bytes = message.to_string().into_bytes();
|
||||
@@ -430,7 +446,7 @@ impl MixnetClient {
|
||||
/// Sends stringy data to the supplied Nym address, and skip sending reply-SURBs
|
||||
pub async fn send_str_direct(&self, address: Recipient, message: &str) {
|
||||
let message_bytes = message.to_string().into_bytes();
|
||||
self.send_bytes(address, message_bytes).await;
|
||||
self.send_bytes_direct(address, message_bytes).await;
|
||||
}
|
||||
|
||||
/// Sends bytes to the supplied Nym address
|
||||
@@ -451,13 +467,12 @@ impl MixnetClient {
|
||||
pub async fn send_bytes(&self, address: Recipient, message: Vec<u8>) {
|
||||
let lane = TransmissionLane::General;
|
||||
let input_msg = InputMessage::new_anonymous(address, message, 20, lane);
|
||||
if self
|
||||
.client_input
|
||||
.input_sender
|
||||
.send(input_msg)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
self.send_input_message(input_msg).await
|
||||
}
|
||||
|
||||
/// Sends a [`InputMessage`] to the mixnet.
|
||||
async fn send_input_message(&self, message: InputMessage) {
|
||||
if self.client_input.send(message).await.is_err() {
|
||||
log::error!("Failed to send message");
|
||||
}
|
||||
}
|
||||
@@ -501,3 +516,15 @@ impl MixnetClient {
|
||||
self.task_manager.wait_for_shutdown().await;
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MixnetClientSender {
|
||||
client_input: ClientInput,
|
||||
}
|
||||
|
||||
impl MixnetClientSender {
|
||||
pub async fn send_input_message(&mut self, message: InputMessage) {
|
||||
if self.client_input.send(message).await.is_err() {
|
||||
log::error!("Failed to send message");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,9 +31,10 @@ tokio-tungstenite = "0.17.2"
|
||||
# internal
|
||||
client-connections = { path = "../../common/client-connections" }
|
||||
completions = { path = "../../common/completions" }
|
||||
network-defaults = { path = "../../common/network-defaults" }
|
||||
nymsphinx = { path = "../../common/nymsphinx" }
|
||||
logging = { path = "../../common/logging"}
|
||||
network-defaults = { path = "../../common/network-defaults" }
|
||||
nym-sdk = { path = "../../sdk/rust/nym-sdk" }
|
||||
nymsphinx = { path = "../../common/nymsphinx" }
|
||||
ordered-buffer = {path = "../../common/socks5/ordered-buffer"}
|
||||
proxy-helpers = { path = "../../common/socks5/proxy-helpers" }
|
||||
socks5-requests = { path = "../../common/socks5/requests" }
|
||||
|
||||
@@ -4,22 +4,16 @@ use crate::allowed_hosts;
|
||||
use crate::allowed_hosts::OutboundRequestFilter;
|
||||
use crate::error::NetworkRequesterError;
|
||||
use crate::statistics::ServiceStatisticsCollector;
|
||||
use crate::websocket;
|
||||
use crate::websocket::TSWebsocketStream;
|
||||
use crate::{reply, socks5};
|
||||
use client_connections::{
|
||||
ConnectionCommand, ConnectionCommandReceiver, LaneQueueLengths, TransmissionLane,
|
||||
};
|
||||
use client_connections::LaneQueueLengths;
|
||||
use futures::channel::mpsc;
|
||||
use futures::stream::{SplitSink, SplitStream};
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use nymsphinx::addressing::clients::Recipient;
|
||||
use nymsphinx::anonymous_replies::requests::AnonymousSenderTag;
|
||||
use nymsphinx::receiver::ReconstructedMessage;
|
||||
use proxy_helpers::connection_controller::{
|
||||
BroadcastActiveConnections, Controller, ControllerCommand, ControllerSender,
|
||||
use proxy_helpers::{
|
||||
connection_controller::{Controller, ControllerCommand, ControllerSender},
|
||||
proxy_runner::{MixProxyReader, MixProxySender},
|
||||
};
|
||||
use proxy_helpers::proxy_runner::{MixProxyReader, MixProxySender};
|
||||
use socks5_requests::{
|
||||
ConnectRequest, ConnectionId, Message as Socks5Message, NetworkRequesterResponse, Request,
|
||||
Response,
|
||||
@@ -28,14 +22,11 @@ use statistics_common::collector::StatisticsSender;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use task::TaskClient;
|
||||
use tokio_tungstenite::tungstenite::protocol::Message;
|
||||
use websocket_requests::{requests::ClientRequest, responses::ServerResponse};
|
||||
|
||||
// Since it's an atomic, it's safe to be kept static and shared across threads
|
||||
static ACTIVE_PROXIES: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
pub struct ServiceProvider {
|
||||
websocket_address: String,
|
||||
outbound_request_filter: OutboundRequestFilter,
|
||||
open_proxy: bool,
|
||||
enable_statistics: bool,
|
||||
@@ -44,7 +35,6 @@ pub struct ServiceProvider {
|
||||
|
||||
impl ServiceProvider {
|
||||
pub async fn new(
|
||||
websocket_address: String,
|
||||
open_proxy: bool,
|
||||
enable_statistics: bool,
|
||||
stats_provider_addr: Option<Recipient>,
|
||||
@@ -67,7 +57,6 @@ impl ServiceProvider {
|
||||
|
||||
let outbound_request_filter = OutboundRequestFilter::new(allowed_hosts, unknown_hosts);
|
||||
ServiceProvider {
|
||||
websocket_address,
|
||||
outbound_request_filter,
|
||||
open_proxy,
|
||||
enable_statistics,
|
||||
@@ -78,10 +67,9 @@ impl ServiceProvider {
|
||||
/// Listens for any messages from `mix_reader` that should be written back to the mix network
|
||||
/// via the `websocket_writer`.
|
||||
async fn mixnet_response_listener(
|
||||
mut websocket_writer: SplitSink<TSWebsocketStream, Message>,
|
||||
mut mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender,
|
||||
mut mix_reader: MixProxyReader<(Socks5Message, reply::ReturnAddress)>,
|
||||
stats_collector: Option<ServiceStatisticsCollector>,
|
||||
mut client_connection_rx: ConnectionCommandReceiver,
|
||||
) {
|
||||
loop {
|
||||
tokio::select! {
|
||||
@@ -102,102 +90,19 @@ impl ServiceProvider {
|
||||
}
|
||||
}
|
||||
|
||||
// make 'request' to native-websocket client
|
||||
let conn_id = msg.conn_id();
|
||||
let response_message = return_address.send_back_to(msg.into_bytes(), conn_id);
|
||||
|
||||
let message = Message::Binary(response_message.serialize());
|
||||
websocket_writer.send(message).await.unwrap();
|
||||
mixnet_client_sender.send_input_message(response_message).await;
|
||||
} else {
|
||||
log::error!("Exiting: channel closed!");
|
||||
break;
|
||||
}
|
||||
},
|
||||
Some(command) = client_connection_rx.next() => {
|
||||
match command {
|
||||
ConnectionCommand::Close(id) => {
|
||||
let msg = ClientRequest::ClosedConnection(id);
|
||||
let ws_msg = Message::Binary(msg.serialize());
|
||||
websocket_writer.send(ws_msg).await.unwrap();
|
||||
}
|
||||
ConnectionCommand::ActiveConnections(ids) => {
|
||||
// We can optimize this by sending a single request, but this is
|
||||
// usually in the low single digits, max a few tens, so we leave that
|
||||
// for a rainy day.
|
||||
// Also that means fiddling with the currently manual
|
||||
// serialize/deserialize we do with ClientRequests ...
|
||||
for id in ids {
|
||||
log::trace!("Requesting lane queue length for: {}", id);
|
||||
let msg = ClientRequest::GetLaneQueueLength(id);
|
||||
let ws_msg = Message::Binary(msg.serialize());
|
||||
websocket_writer.send(ws_msg).await.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_lane_queue_length_response(
|
||||
lane_queue_lengths: &LaneQueueLengths,
|
||||
lane: u64,
|
||||
queue_length: usize,
|
||||
) {
|
||||
log::trace!("Received LaneQueueLength lane: {lane}, queue_length: {queue_length}");
|
||||
if let Ok(mut lane_queue_lengths) = lane_queue_lengths.lock() {
|
||||
let lane = TransmissionLane::ConnectionId(lane);
|
||||
lane_queue_lengths.map.insert(lane, queue_length);
|
||||
} else {
|
||||
log::warn!("Unable to lock lane queue lengths, skipping updating received lane length")
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_websocket_message(
|
||||
websocket_reader: &mut SplitStream<TSWebsocketStream>,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
) -> Option<ReconstructedMessage> {
|
||||
while let Some(msg) = websocket_reader.next().await {
|
||||
let data = match msg {
|
||||
Ok(msg) => msg.into_data(),
|
||||
Err(err) => {
|
||||
log::error!("Failed to read from the websocket: {err}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// try to recover the actual message from the mix network...
|
||||
let deserialized_message = match ServerResponse::deserialize(&data) {
|
||||
Ok(deserialized) => deserialized,
|
||||
Err(err) => {
|
||||
log::error!(
|
||||
"Failed to deserialize received websocket message! - {}",
|
||||
err
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let received = match deserialized_message {
|
||||
ServerResponse::Received(received) => received,
|
||||
ServerResponse::LaneQueueLength { lane, queue_length } => {
|
||||
Self::handle_lane_queue_length_response(
|
||||
&lane_queue_lengths,
|
||||
lane,
|
||||
queue_length,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
ServerResponse::Error(err) => {
|
||||
panic!("received error from native client! - {err}")
|
||||
}
|
||||
_ => unimplemented!("probably should never be reached?"),
|
||||
};
|
||||
return Some(received);
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
async fn start_proxy(
|
||||
conn_id: ConnectionId,
|
||||
remote_addr: String,
|
||||
@@ -395,10 +300,8 @@ impl ServiceProvider {
|
||||
|
||||
/// Start all subsystems
|
||||
pub async fn run(&mut self) -> Result<(), NetworkRequesterError> {
|
||||
let websocket_stream = self.connect_websocket(&self.websocket_address).await?;
|
||||
|
||||
// split the websocket so that we could read and write from separate threads
|
||||
let (websocket_writer, mut websocket_reader) = websocket_stream.split();
|
||||
// Connect to the mixnet
|
||||
let mut mixnet_client = nym_sdk::mixnet::MixnetClient::connect().await.unwrap();
|
||||
|
||||
// channels responsible for managing messages that are to be sent to the mix network. The receiver is
|
||||
// going to be used by `mixnet_response_listener`
|
||||
@@ -408,21 +311,9 @@ impl ServiceProvider {
|
||||
// Used to notify tasks to shutdown. Not all tasks fully supports this (yet).
|
||||
let shutdown = task::TaskManager::default();
|
||||
|
||||
// Channel for announcing client connection state by the controller.
|
||||
// The `mixnet_response_listener` will use this to either report closed connection to the
|
||||
// client or request lane queue lengths.
|
||||
let (client_connection_tx, client_connection_rx) = mpsc::unbounded();
|
||||
|
||||
// Shared queue length data. Published by the `OutQueueController` in the client, and used
|
||||
// primarily to throttle incoming connections
|
||||
let shared_lane_queue_lengths = LaneQueueLengths::new();
|
||||
|
||||
// Controller for managing all active connections.
|
||||
// We provide it with a ShutdownListener since it requires it, even though for the network
|
||||
// requester shutdown signalling is not yet fully implemented.
|
||||
let (mut active_connections_controller, mut controller_sender) = Controller::new(
|
||||
client_connection_tx,
|
||||
BroadcastActiveConnections::On,
|
||||
mixnet_client.connection_command_sender(),
|
||||
shutdown.subscribe(),
|
||||
);
|
||||
|
||||
@@ -446,58 +337,37 @@ impl ServiceProvider {
|
||||
};
|
||||
|
||||
let stats_collector_clone = stats_collector.clone();
|
||||
let mixnet_client_sender = mixnet_client.sender();
|
||||
|
||||
// start the listener for mix messages
|
||||
tokio::spawn(async move {
|
||||
Self::mixnet_response_listener(
|
||||
websocket_writer,
|
||||
mixnet_client_sender,
|
||||
mix_input_receiver,
|
||||
stats_collector_clone,
|
||||
client_connection_rx,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
|
||||
let nym_address = mixnet_client.nym_address();
|
||||
log::info!("Our nym address is: {nym_address}");
|
||||
log::info!("All systems go. Press CTRL-C to stop the server.");
|
||||
// for each incoming message from the websocket... (which in 99.99% cases is going to be a mix message)
|
||||
loop {
|
||||
let Some(received) = Self::read_websocket_message(
|
||||
&mut websocket_reader,
|
||||
shared_lane_queue_lengths.clone()
|
||||
|
||||
while let Some(received) = mixnet_client.wait_for_messages().await {
|
||||
for received in received {
|
||||
self.handle_proxy_message(
|
||||
received,
|
||||
&mut controller_sender,
|
||||
&mix_input_sender,
|
||||
mixnet_client.shared_lane_queue_lengths(),
|
||||
stats_collector.clone(),
|
||||
shutdown.subscribe(),
|
||||
)
|
||||
.await
|
||||
else {
|
||||
log::error!("The websocket stream has finished!");
|
||||
return Err(NetworkRequesterError::ConnectionClosed);
|
||||
};
|
||||
|
||||
self.handle_proxy_message(
|
||||
received,
|
||||
&mut controller_sender,
|
||||
&mix_input_sender,
|
||||
shared_lane_queue_lengths.clone(),
|
||||
stats_collector.clone(),
|
||||
shutdown.subscribe(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
// Make the websocket connection so we can receive incoming Mixnet messages.
|
||||
async fn connect_websocket(
|
||||
&self,
|
||||
uri: &str,
|
||||
) -> Result<TSWebsocketStream, NetworkRequesterError> {
|
||||
match websocket::Connection::new(uri).connect().await {
|
||||
Ok(ws_stream) => {
|
||||
log::info!("* connected to local websocket server at {}", uri);
|
||||
Ok(ws_stream)
|
||||
}
|
||||
Err(err) => {
|
||||
log::error!(
|
||||
"Error: websocket connection attempt failed, is the Nym client running?"
|
||||
);
|
||||
Err(err.into())
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
log::error!("Network requester exited unexpectedly");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,13 +1,5 @@
|
||||
use crate::websocket::WebsocketConnectionError;
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum NetworkRequesterError {
|
||||
#[error("I/O error: {0}")]
|
||||
IoError(#[from] std::io::Error),
|
||||
|
||||
#[error("Websocket error")]
|
||||
WebsocketConnectionError(#[from] WebsocketConnectionError),
|
||||
|
||||
#[error("Websocket connection closed")]
|
||||
ConnectionClosed,
|
||||
}
|
||||
|
||||
@@ -16,7 +16,6 @@ mod error;
|
||||
mod reply;
|
||||
mod socks5;
|
||||
mod statistics;
|
||||
mod websocket;
|
||||
|
||||
const ENABLE_STATISTICS: &str = "enable-statistics";
|
||||
|
||||
@@ -56,16 +55,8 @@ impl Run {
|
||||
.transpose()
|
||||
.unwrap_or(None);
|
||||
|
||||
let websocket_address = format!(
|
||||
"ws://localhost:{}",
|
||||
self.websocket_port
|
||||
.as_ref()
|
||||
.unwrap_or(&network_defaults::DEFAULT_WEBSOCKET_LISTENING_PORT.to_string())
|
||||
);
|
||||
|
||||
log::info!("Starting socks5 service provider");
|
||||
let mut server = core::ServiceProvider::new(
|
||||
websocket_address,
|
||||
self.open_proxy,
|
||||
self.enable_statistics,
|
||||
stats_provider_addr,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use client_connections::TransmissionLane;
|
||||
use nym_sdk::mixnet::InputMessage;
|
||||
use nymsphinx::addressing::clients::Recipient;
|
||||
use nymsphinx::anonymous_replies::requests::AnonymousSenderTag;
|
||||
use websocket_requests::requests::ClientRequest;
|
||||
|
||||
/// A return address is a way to send a message back to the original sender. It can be either
|
||||
/// an explicitly known Recipient, or a surb AnonymousSenderTag.
|
||||
@@ -24,17 +25,17 @@ impl ReturnAddress {
|
||||
None
|
||||
}
|
||||
|
||||
pub(super) fn send_back_to(self, message: Vec<u8>, connection_id: u64) -> ClientRequest {
|
||||
pub(super) fn send_back_to(self, message: Vec<u8>, connection_id: u64) -> InputMessage {
|
||||
match self {
|
||||
ReturnAddress::Known(recipient) => ClientRequest::Send {
|
||||
ReturnAddress::Known(recipient) => InputMessage::Regular {
|
||||
recipient: *recipient,
|
||||
message,
|
||||
connection_id: Some(connection_id),
|
||||
data: message,
|
||||
lane: TransmissionLane::ConnectionId(connection_id),
|
||||
},
|
||||
ReturnAddress::Anonymous(sender_tag) => ClientRequest::Reply {
|
||||
message,
|
||||
sender_tag,
|
||||
connection_id: Some(connection_id),
|
||||
ReturnAddress::Anonymous(sender_tag) => InputMessage::Reply {
|
||||
recipient_tag: sender_tag,
|
||||
data: message,
|
||||
lane: TransmissionLane::ConnectionId(connection_id),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,34 +0,0 @@
|
||||
// Copyright 2020 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
use tokio_tungstenite::{connect_async, MaybeTlsStream};
|
||||
|
||||
#[allow(clippy::upper_case_acronyms)]
|
||||
pub(crate) type TSWebsocketStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
|
||||
|
||||
pub struct Connection {
|
||||
uri: String,
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
pub fn new(uri: &str) -> Connection {
|
||||
Connection {
|
||||
uri: String::from(uri),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn connect(&self) -> Result<TSWebsocketStream, WebsocketConnectionError> {
|
||||
match connect_async(&self.uri).await {
|
||||
Ok((ws_stream, _)) => Ok(ws_stream),
|
||||
Err(e) => Err(WebsocketConnectionError::ConnectionNotEstablished(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum WebsocketConnectionError {
|
||||
#[error("Connection not established")]
|
||||
ConnectionNotEstablished(tokio_tungstenite::tungstenite::Error),
|
||||
}
|
||||
@@ -10855,9 +10855,9 @@ htmlparser2@^6.1.0:
|
||||
entities "^2.0.0"
|
||||
|
||||
http-cache-semantics@^4.1.0:
|
||||
version "4.1.0"
|
||||
resolved "https://registry.yarnpkg.com/http-cache-semantics/-/http-cache-semantics-4.1.0.tgz#49e91c5cbf36c9b94bcfcd71c23d5249ec74e390"
|
||||
integrity sha512-carPklcUh7ROWRK7Cv27RPtdhYhUsela/ue5/jKzjegVvXDqM2ILE9Q2BGn9JZJh1g87cp56su/FgQSzcWS8cQ==
|
||||
version "4.1.1"
|
||||
resolved "https://registry.yarnpkg.com/http-cache-semantics/-/http-cache-semantics-4.1.1.tgz#abe02fcb2985460bf0323be664436ec3476a6d5a"
|
||||
integrity sha512-er295DKPVsV82j5kw1Gjt+ADA/XYHsajl82cGNQG2eyoPkvgUhX+nDIyelzhIWbbsXP39EHcI6l5tYs2FYqYXQ==
|
||||
|
||||
http-deceiver@^1.2.7:
|
||||
version "1.2.7"
|
||||
|
||||
Reference in New Issue
Block a user