Compare commits

...

1 Commits

Author SHA1 Message Date
Jon Häggblad ec9e3bb5ab client-core: add ClientState to BaseClient 2022-12-21 14:11:57 +01:00
4 changed files with 46 additions and 13 deletions
@@ -48,10 +48,14 @@ pub struct ClientInput {
} }
pub struct ClientOutput { pub struct ClientOutput {
pub shared_lane_queue_lengths: LaneQueueLengths,
pub received_buffer_request_sender: ReceivedBufferRequestSender, pub received_buffer_request_sender: ReceivedBufferRequestSender,
} }
pub struct ClientState {
pub shared_lane_queue_lengths: LaneQueueLengths,
pub reply_controller_sender: ReplyControllerSender,
}
pub enum ClientInputStatus { pub enum ClientInputStatus {
AwaitingProducer { client_input: ClientInput }, AwaitingProducer { client_input: ClientInput },
Connected, Connected,
@@ -467,11 +471,13 @@ where
}, },
client_output: ClientOutputStatus::AwaitingConsumer { client_output: ClientOutputStatus::AwaitingConsumer {
client_output: ClientOutput { client_output: ClientOutput {
shared_lane_queue_lengths,
received_buffer_request_sender, received_buffer_request_sender,
}, },
}, },
client_state: ClientState {
shared_lane_queue_lengths,
reply_controller_sender, reply_controller_sender,
},
task_manager, task_manager,
}) })
} }
@@ -480,9 +486,7 @@ where
pub struct BaseClient { pub struct BaseClient {
pub client_input: ClientInputStatus, pub client_input: ClientInputStatus,
pub client_output: ClientOutputStatus, pub client_output: ClientOutputStatus,
pub client_state: ClientState,
// it feels very wrong to put this channel here, but I can't think of any other way of passing it to the native client
pub reply_controller_sender: ReplyControllerSender,
pub task_manager: TaskManager, pub task_manager: TaskManager,
} }
@@ -99,6 +99,24 @@ impl ReplyControllerSender {
} }
} }
pub struct ReplyQueueLengths {
reply_controller_sender: ReplyControllerSender,
}
impl ReplyQueueLengths {
pub fn new(reply_controller_sender: ReplyControllerSender) -> Self {
Self {
reply_controller_sender,
}
}
pub async fn get_lane_queue_length(&self, connection_id: ConnectionId) -> usize {
self.reply_controller_sender
.get_lane_queue_length(connection_id)
.await
}
}
pub(crate) type ReplyControllerReceiver = mpsc::UnboundedReceiver<ReplyControllerMessage>; pub(crate) type ReplyControllerReceiver = mpsc::UnboundedReceiver<ReplyControllerMessage>;
#[derive(Debug)] #[derive(Debug)]
+9 -5
View File
@@ -8,12 +8,11 @@ use crate::error::ClientError;
use crate::websocket; use crate::websocket;
use client_connections::TransmissionLane; use client_connections::TransmissionLane;
use client_core::client::base_client::{ use client_core::client::base_client::{
non_wasm_helpers, BaseClientBuilder, ClientInput, ClientOutput, non_wasm_helpers, BaseClientBuilder, ClientInput, ClientOutput, ClientState,
}; };
use client_core::client::inbound_messages::InputMessage; use client_core::client::inbound_messages::InputMessage;
use client_core::client::key_manager::KeyManager; use client_core::client::key_manager::KeyManager;
use client_core::client::received_buffer::{ReceivedBufferMessage, ReconstructedMessagesReceiver}; use client_core::client::received_buffer::{ReceivedBufferMessage, ReconstructedMessagesReceiver};
use client_core::client::replies::reply_controller::requests::ReplyControllerSender;
use client_core::config::persistence::key_pathfinder::ClientKeyPathfinder; use client_core::config::persistence::key_pathfinder::ClientKeyPathfinder;
use futures::channel::mpsc; use futures::channel::mpsc;
use gateway_client::bandwidth::BandwidthController; use gateway_client::bandwidth::BandwidthController;
@@ -87,8 +86,8 @@ impl SocketClient {
config: &Config, config: &Config,
client_input: ClientInput, client_input: ClientInput,
client_output: ClientOutput, client_output: ClientOutput,
client_state: ClientState,
self_address: &Recipient, self_address: &Recipient,
reply_controller_sender: ReplyControllerSender,
shutdown: task::TaskClient, shutdown: task::TaskClient,
) { ) {
info!("Starting websocket listener..."); info!("Starting websocket listener...");
@@ -99,10 +98,14 @@ impl SocketClient {
} = client_input; } = client_input;
let ClientOutput { let ClientOutput {
shared_lane_queue_lengths,
received_buffer_request_sender, received_buffer_request_sender,
} = client_output; } = client_output;
let ClientState {
shared_lane_queue_lengths,
reply_controller_sender,
} = client_state;
let websocket_handler = websocket::HandlerBuilder::new( let websocket_handler = websocket::HandlerBuilder::new(
input_sender, input_sender,
connection_command_sender, connection_command_sender,
@@ -151,13 +154,14 @@ impl SocketClient {
let mut started_client = base_builder.start_base().await?; let mut started_client = base_builder.start_base().await?;
let client_input = started_client.client_input.register_producer(); let client_input = started_client.client_input.register_producer();
let client_output = started_client.client_output.register_consumer(); let client_output = started_client.client_output.register_consumer();
let client_state = started_client.client_state;
Self::start_websocket_listener( Self::start_websocket_listener(
&self.config, &self.config,
client_input, client_input,
client_output, client_output,
client_state,
&self_address, &self_address,
started_client.reply_controller_sender,
started_client.task_manager.subscribe(), started_client.task_manager.subscribe(),
); );
+9 -2
View File
@@ -9,7 +9,7 @@ use crate::socks::{
server::SphinxSocksServer, server::SphinxSocksServer,
}; };
use client_core::client::base_client::{ use client_core::client::base_client::{
non_wasm_helpers, BaseClientBuilder, ClientInput, ClientOutput, non_wasm_helpers, BaseClientBuilder, ClientInput, ClientOutput, ClientState,
}; };
use client_core::client::key_manager::KeyManager; use client_core::client::key_manager::KeyManager;
use client_core::config::persistence::key_pathfinder::ClientKeyPathfinder; use client_core::config::persistence::key_pathfinder::ClientKeyPathfinder;
@@ -95,6 +95,7 @@ impl NymClient {
config: &Config, config: &Config,
client_input: ClientInput, client_input: ClientInput,
client_output: ClientOutput, client_output: ClientOutput,
client_status: ClientState,
self_address: Recipient, self_address: Recipient,
shutdown: TaskClient, shutdown: TaskClient,
) { ) {
@@ -108,10 +109,14 @@ impl NymClient {
} = client_input; } = client_input;
let ClientOutput { let ClientOutput {
shared_lane_queue_lengths,
received_buffer_request_sender, received_buffer_request_sender,
} = client_output; } = client_output;
let ClientState {
shared_lane_queue_lengths,
reply_controller_sender: _,
} = client_status;
let authenticator = Authenticator::new(auth_methods, allowed_users); let authenticator = Authenticator::new(auth_methods, allowed_users);
let mut sphinx_socks = SphinxSocksServer::new( let mut sphinx_socks = SphinxSocksServer::new(
config.get_listening_port(), config.get_listening_port(),
@@ -218,11 +223,13 @@ impl NymClient {
let mut started_client = base_builder.start_base().await?; let mut started_client = base_builder.start_base().await?;
let client_input = started_client.client_input.register_producer(); let client_input = started_client.client_input.register_producer();
let client_output = started_client.client_output.register_consumer(); let client_output = started_client.client_output.register_consumer();
let client_state = started_client.client_state;
Self::start_socks5_listener( Self::start_socks5_listener(
&self.config, &self.config,
client_input, client_input,
client_output, client_output,
client_state,
self_address, self_address,
started_client.task_manager.subscribe(), started_client.task_manager.subscribe(),
); );