Merge remote-tracking branch 'origin/develop' into release/2025.16-halloumi
This commit is contained in:
@@ -11,7 +11,7 @@ use nym_client_core::client::base_client::{
|
||||
BaseClientBuilder, ClientInput, ClientOutput, ClientState,
|
||||
};
|
||||
use nym_sphinx::params::PacketType;
|
||||
use nym_task::TaskHandle;
|
||||
use nym_task::ShutdownManager;
|
||||
use nym_validator_client::QueryHttpRpcNyxdClient;
|
||||
use std::error::Error;
|
||||
use std::path::PathBuf;
|
||||
@@ -29,6 +29,8 @@ pub struct SocketClient {
|
||||
|
||||
/// Optional path to a .json file containing standalone network details.
|
||||
custom_mixnet: Option<PathBuf>,
|
||||
|
||||
shutdown_manager: ShutdownManager,
|
||||
}
|
||||
|
||||
impl SocketClient {
|
||||
@@ -40,6 +42,7 @@ impl SocketClient {
|
||||
SocketClient {
|
||||
config,
|
||||
custom_mixnet,
|
||||
shutdown_manager: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,7 +52,7 @@ impl SocketClient {
|
||||
client_output: ClientOutput,
|
||||
client_state: ClientState,
|
||||
self_address: &Recipient,
|
||||
task_client: nym_task::TaskClient,
|
||||
shutdown_token: nym_task::ShutdownToken,
|
||||
packet_type: PacketType,
|
||||
) {
|
||||
info!("Starting websocket listener...");
|
||||
@@ -77,24 +80,24 @@ impl SocketClient {
|
||||
shared_lane_queue_lengths,
|
||||
reply_controller_sender,
|
||||
Some(packet_type),
|
||||
task_client.fork("websocket_handler"),
|
||||
shutdown_token.clone(),
|
||||
);
|
||||
|
||||
websocket::Listener::new(
|
||||
config.socket.host,
|
||||
config.socket.listening_port,
|
||||
task_client.with_suffix("websocket_listener"),
|
||||
shutdown_token.child_token(),
|
||||
)
|
||||
.start(websocket_handler);
|
||||
}
|
||||
|
||||
/// blocking version of `start_socket` method. Will run forever (or until SIGINT is sent)
|
||||
pub async fn run_socket_forever(self) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
let shutdown = self.start_socket().await?;
|
||||
let mut shutdown = self.start_socket().await?;
|
||||
|
||||
let res = shutdown.wait_for_shutdown().await;
|
||||
shutdown.run_until_shutdown().await;
|
||||
log::info!("Stopping nym-client");
|
||||
res
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn initialise_storage(&self) -> Result<OnDiskPersistent, ClientError> {
|
||||
@@ -119,6 +122,7 @@ impl SocketClient {
|
||||
|
||||
let mut base_client =
|
||||
BaseClientBuilder::new(self.config().base(), storage, dkg_query_client)
|
||||
.with_shutdown(self.shutdown_manager.shutdown_tracker_owned())
|
||||
.with_user_agent(user_agent);
|
||||
|
||||
if let Some(custom_mixnet) = &self.custom_mixnet {
|
||||
@@ -128,7 +132,7 @@ impl SocketClient {
|
||||
Ok(base_client)
|
||||
}
|
||||
|
||||
pub async fn start_socket(self) -> Result<TaskHandle, ClientError> {
|
||||
pub async fn start_socket(self) -> Result<ShutdownManager, ClientError> {
|
||||
if !self.config.socket.socket_type.is_websocket() {
|
||||
return Err(ClientError::InvalidSocketMode);
|
||||
}
|
||||
@@ -147,13 +151,13 @@ impl SocketClient {
|
||||
client_output,
|
||||
client_state,
|
||||
&self_address,
|
||||
started_client.task_handle.get_handle(),
|
||||
self.shutdown_manager.child_shutdown_token(),
|
||||
packet_type,
|
||||
);
|
||||
|
||||
info!("Client startup finished!");
|
||||
info!("The address of this client is: {self_address}");
|
||||
|
||||
Ok(started_client.task_handle)
|
||||
Ok(self.shutdown_manager)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ use nym_sphinx::receiver::ReconstructedMessage;
|
||||
use nym_task::connections::{
|
||||
ConnectionCommand, ConnectionCommandSender, ConnectionId, LaneQueueLengths, TransmissionLane,
|
||||
};
|
||||
use nym_task::TaskClient;
|
||||
use nym_task::ShutdownToken;
|
||||
use std::time::Duration;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time::Instant;
|
||||
@@ -44,7 +44,7 @@ pub(crate) struct HandlerBuilder {
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
reply_controller_sender: ReplyControllerSender,
|
||||
packet_type: Option<PacketType>,
|
||||
task_client: TaskClient,
|
||||
shutdown_token: ShutdownToken,
|
||||
}
|
||||
|
||||
impl HandlerBuilder {
|
||||
@@ -57,7 +57,7 @@ impl HandlerBuilder {
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
reply_controller_sender: ReplyControllerSender,
|
||||
packet_type: Option<PacketType>,
|
||||
task_client: TaskClient,
|
||||
shutdown_token: ShutdownToken,
|
||||
) -> Self {
|
||||
Self {
|
||||
msg_input,
|
||||
@@ -67,14 +67,13 @@ impl HandlerBuilder {
|
||||
lane_queue_lengths,
|
||||
reply_controller_sender,
|
||||
packet_type,
|
||||
task_client,
|
||||
shutdown_token,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: make sure we only ever have one active handler
|
||||
pub fn create_active_handler(&self) -> Handler {
|
||||
let mut task_client = self.task_client.fork("active_handler");
|
||||
task_client.disarm();
|
||||
let shutdown_token = self.shutdown_token.clone();
|
||||
Handler {
|
||||
msg_input: self.msg_input.clone(),
|
||||
client_connection_tx: self.client_connection_tx.clone(),
|
||||
@@ -85,7 +84,7 @@ impl HandlerBuilder {
|
||||
lane_queue_lengths: self.lane_queue_lengths.clone(),
|
||||
reply_controller_sender: self.reply_controller_sender.clone(),
|
||||
packet_type: self.packet_type,
|
||||
task_client,
|
||||
shutdown_token,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -100,19 +99,14 @@ pub(crate) struct Handler {
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
reply_controller_sender: ReplyControllerSender,
|
||||
packet_type: Option<PacketType>,
|
||||
task_client: TaskClient,
|
||||
shutdown_token: ShutdownToken,
|
||||
}
|
||||
|
||||
impl Drop for Handler {
|
||||
fn drop(&mut self) {
|
||||
if let Err(err) = self
|
||||
let _ = self
|
||||
.buffer_requester
|
||||
.unbounded_send(ReceivedBufferMessage::ReceiverDisconnect)
|
||||
{
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
error!("failed to disconnect the receiver from the buffer: {err}");
|
||||
}
|
||||
}
|
||||
.unbounded_send(ReceivedBufferMessage::ReceiverDisconnect);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -142,7 +136,7 @@ impl Handler {
|
||||
{
|
||||
Ok(length) => length,
|
||||
Err(err) => {
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
if !self.shutdown_token.is_cancelled() {
|
||||
error!(
|
||||
"Failed to get reply queue length for connection {connection_id}: {err}"
|
||||
);
|
||||
@@ -192,7 +186,7 @@ impl Handler {
|
||||
// the ack control is now responsible for chunking, etc.
|
||||
let input_msg = InputMessage::new_regular(recipient, message, lane, self.packet_type);
|
||||
if let Err(err) = self.msg_input.send(input_msg).await {
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
if !self.shutdown_token.is_cancelled() {
|
||||
error!("Failed to send message to the input buffer: {err}");
|
||||
}
|
||||
}
|
||||
@@ -225,7 +219,7 @@ impl Handler {
|
||||
let input_msg =
|
||||
InputMessage::new_anonymous(recipient, message, reply_surbs, lane, self.packet_type);
|
||||
if let Err(err) = self.msg_input.send(input_msg).await {
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
if !self.shutdown_token.is_cancelled() {
|
||||
error!("Failed to send anonymous message to the input buffer: {err}");
|
||||
}
|
||||
}
|
||||
@@ -253,7 +247,7 @@ impl Handler {
|
||||
|
||||
let input_msg = InputMessage::new_reply(recipient_tag, message, lane, self.packet_type);
|
||||
if let Err(err) = self.msg_input.send(input_msg).await {
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
if !self.shutdown_token.is_cancelled() {
|
||||
error!("Failed to send reply message to the input buffer: {err}");
|
||||
}
|
||||
}
|
||||
@@ -275,7 +269,7 @@ impl Handler {
|
||||
.client_connection_tx
|
||||
.unbounded_send(ConnectionCommand::Close(connection_id))
|
||||
{
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
if !self.shutdown_token.is_cancelled() {
|
||||
error!("Failed to send close connection command: {err}");
|
||||
}
|
||||
}
|
||||
@@ -394,11 +388,14 @@ impl Handler {
|
||||
}
|
||||
|
||||
async fn listen_for_requests(&mut self, mut msg_receiver: ReconstructedMessagesReceiver) {
|
||||
let mut task_client = self.task_client.fork("select");
|
||||
task_client.disarm();
|
||||
let shutdown_token = self.shutdown_token.clone();
|
||||
|
||||
while !task_client.is_shutdown() {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = shutdown_token.cancelled() => {
|
||||
log::trace!("Websocket handler: Received shutdown");
|
||||
break;
|
||||
}
|
||||
// we can either get a client request from the websocket
|
||||
socket_msg = self.next_websocket_request() => {
|
||||
if socket_msg.is_none() {
|
||||
@@ -436,9 +433,6 @@ impl Handler {
|
||||
break;
|
||||
}
|
||||
}
|
||||
_ = task_client.recv() => {
|
||||
log::trace!("Websocket handler: Received shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
log::debug!("Websocket handler: Exiting");
|
||||
@@ -464,7 +458,7 @@ impl Handler {
|
||||
reconstructed_sender,
|
||||
))
|
||||
{
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
if !self.shutdown_token.is_cancelled() {
|
||||
error!("failed to announce the receiver to the buffer: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
use super::handler::HandlerBuilder;
|
||||
use log::*;
|
||||
use nym_task::TaskClient;
|
||||
use nym_task::ShutdownToken;
|
||||
use std::net::IpAddr;
|
||||
use std::{net::SocketAddr, process, sync::Arc};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
@@ -23,15 +23,15 @@ impl State {
|
||||
pub(crate) struct Listener {
|
||||
address: SocketAddr,
|
||||
state: State,
|
||||
task_client: TaskClient,
|
||||
shutdown_token: ShutdownToken,
|
||||
}
|
||||
|
||||
impl Listener {
|
||||
pub(crate) fn new(host: IpAddr, port: u16, task_client: TaskClient) -> Self {
|
||||
pub(crate) fn new(host: IpAddr, port: u16, shutdown_token: ShutdownToken) -> Self {
|
||||
Listener {
|
||||
address: SocketAddr::new(host, port),
|
||||
state: State::AwaitingConnection,
|
||||
task_client,
|
||||
shutdown_token,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,11 +46,11 @@ impl Listener {
|
||||
|
||||
let notify = Arc::new(Notify::new());
|
||||
|
||||
while !self.task_client.is_shutdown() {
|
||||
while !self.shutdown_token.is_cancelled() {
|
||||
tokio::select! {
|
||||
// When the handler finishes we check if shutdown is signalled
|
||||
_ = notify.notified() => {
|
||||
if self.task_client.is_shutdown() {
|
||||
if self.shutdown_token.is_cancelled() {
|
||||
log::trace!("Websocket listener: detected shutdown after connection closed");
|
||||
break;
|
||||
}
|
||||
@@ -59,7 +59,7 @@ impl Listener {
|
||||
}
|
||||
// ... but when there is no connected client at the time of shutdown being
|
||||
// signalled, we handle it here.
|
||||
_ = self.task_client.recv() => {
|
||||
_ = self.shutdown_token.cancelled() => {
|
||||
if !self.state.is_connected() {
|
||||
log::trace!("Not connected: shutting down");
|
||||
break;
|
||||
|
||||
Reference in New Issue
Block a user