Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 120220517d | |||
| 7677c02494 | |||
| c20c952633 |
@@ -96,24 +96,46 @@ impl MixTrafficController {
|
||||
mut mix_packets: Vec<MixPacket>,
|
||||
) -> Result<(), ErasedGatewayError> {
|
||||
debug_assert!(!mix_packets.is_empty());
|
||||
|
||||
let result = if mix_packets.len() == 1 {
|
||||
let send_future = if mix_packets.len() == 1 {
|
||||
// SAFETY: we just checked we have one packet
|
||||
#[allow(clippy::unwrap_used)]
|
||||
let mix_packet = mix_packets.pop().unwrap();
|
||||
self.gateway_transceiver.send_mix_packet(mix_packet).await
|
||||
self.gateway_transceiver.send_mix_packet(mix_packet)
|
||||
} else {
|
||||
self.gateway_transceiver
|
||||
.batch_send_mix_packets(mix_packets)
|
||||
.await
|
||||
self.gateway_transceiver.batch_send_mix_packets(mix_packets)
|
||||
};
|
||||
|
||||
if result.is_err() {
|
||||
self.consecutive_gateway_failure_count += 1;
|
||||
} else {
|
||||
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
|
||||
self.consecutive_gateway_failure_count = 0;
|
||||
}
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = self.task_client.recv() => {
|
||||
trace!("received shutdown while handling messages");
|
||||
Ok(())
|
||||
}
|
||||
result = send_future => {
|
||||
if result.is_err() {
|
||||
self.consecutive_gateway_failure_count += 1;
|
||||
} else {
|
||||
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
|
||||
self.consecutive_gateway_failure_count = 0;
|
||||
}
|
||||
|
||||
result
|
||||
result
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn on_client_request(&mut self, client_request: ClientRequest) {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = self.task_client.recv() => {
|
||||
trace!("received shutdown while handling client request");
|
||||
}
|
||||
result = self.gateway_transceiver.send_client_request(client_request) => {
|
||||
if let Err(err) = result {
|
||||
error!("Failed to send client request: {err}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start(mut self) {
|
||||
@@ -122,7 +144,11 @@ impl MixTrafficController {
|
||||
|
||||
while !self.task_client.is_shutdown() {
|
||||
tokio::select! {
|
||||
mix_packets = self.mix_rx.recv() => match mix_packets {
|
||||
biased;
|
||||
_ = self.task_client.recv() => {
|
||||
tracing::trace!("MixTrafficController: Received shutdown");
|
||||
break;
|
||||
}mix_packets = self.mix_rx.recv() => match mix_packets {
|
||||
Some(mix_packets) => {
|
||||
if let Err(err) = self.on_messages(mix_packets).await {
|
||||
error!("Failed to send sphinx packet(s) to the gateway: {err}");
|
||||
@@ -144,19 +170,14 @@ impl MixTrafficController {
|
||||
},
|
||||
client_request = self.client_rx.recv() => match client_request {
|
||||
Some(client_request) => {
|
||||
match self.gateway_transceiver.send_client_request(client_request).await {
|
||||
Ok(_) => (),
|
||||
Err(e) => error!("Failed to send client request: {e}"),
|
||||
};
|
||||
self.on_client_request(client_request).await;
|
||||
|
||||
},
|
||||
None => {
|
||||
tracing::trace!("MixTrafficController, client request channel closed");
|
||||
}
|
||||
},
|
||||
_ = self.task_client.recv() => {
|
||||
tracing::trace!("MixTrafficController: Received shutdown");
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
self.task_client.recv_timeout().await;
|
||||
|
||||
+6
-3
@@ -223,6 +223,11 @@ where
|
||||
|
||||
while !self.task_client.is_shutdown() {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = self.task_client.recv() => {
|
||||
tracing::trace!("InputMessageListener: Received shutdown");
|
||||
break;
|
||||
}
|
||||
input_msg = self.input_receiver.recv() => match input_msg {
|
||||
Some(input_msg) => {
|
||||
self.on_input_message(input_msg).await;
|
||||
@@ -232,9 +237,7 @@ where
|
||||
break;
|
||||
}
|
||||
},
|
||||
_ = self.task_client.recv() => {
|
||||
tracing::trace!("InputMessageListener: Received shutdown");
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
self.task_client.recv_timeout().await;
|
||||
|
||||
+6
-3
@@ -179,6 +179,11 @@ where
|
||||
|
||||
while !self.task_client.is_shutdown() {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = self.task_client.recv() => {
|
||||
tracing::trace!("RetransmissionRequestListener: Received shutdown");
|
||||
break;
|
||||
}
|
||||
timed_out_ack = self.request_receiver.next() => match timed_out_ack {
|
||||
Some(timed_out_ack) => self.on_retransmission_request(timed_out_ack, packet_type).await,
|
||||
None => {
|
||||
@@ -186,9 +191,7 @@ where
|
||||
break;
|
||||
}
|
||||
},
|
||||
_ = self.task_client.recv() => {
|
||||
tracing::trace!("RetransmissionRequestListener: Received shutdown");
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
self.task_client.recv_timeout().await;
|
||||
|
||||
@@ -535,6 +535,7 @@ where
|
||||
pending_acks.push(pending_ack);
|
||||
}
|
||||
|
||||
drop(topology_permit);
|
||||
self.insert_pending_acks(pending_acks);
|
||||
self.forward_messages(real_messages, lane).await;
|
||||
|
||||
@@ -716,17 +717,21 @@ where
|
||||
|
||||
// tells real message sender (with the poisson timer) to send this to the mix network
|
||||
pub(crate) async fn forward_messages(
|
||||
&self,
|
||||
&mut self,
|
||||
messages: Vec<RealMessage>,
|
||||
transmission_lane: TransmissionLane,
|
||||
) {
|
||||
if let Err(err) = self
|
||||
.real_message_sender
|
||||
.send((messages, transmission_lane))
|
||||
.await
|
||||
{
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
error!("Failed to forward messages to the real message sender: {err}");
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = self.task_client.recv() => {
|
||||
trace!("received shutdown while attempting to forward mixnet messages");
|
||||
}
|
||||
sending_res = self.real_message_sender.send((messages, transmission_lane)) => {
|
||||
if sending_res.is_err() {
|
||||
error!(
|
||||
"failed to forward mixnet messages due to closed channel (outside of shutdown!)"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -278,17 +278,33 @@ where
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(err) = self.mix_tx.send(vec![next_message]).await {
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
tracing::error!("Failed to send: {err}");
|
||||
let sending_res = tokio::select! {
|
||||
biased;
|
||||
_ = self.task_client.recv() => {
|
||||
trace!("received shutdown signal while attempting to send mix message");
|
||||
return
|
||||
}
|
||||
sending_res = self.mix_tx.send(vec![next_message]) => {
|
||||
sending_res
|
||||
}
|
||||
};
|
||||
|
||||
match sending_res {
|
||||
Err(_) => {
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
tracing::error!(
|
||||
"failed to send mixnet packet due to closed channel (outside of shutdown!)"
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(_) => {
|
||||
let event = if fragment_id.is_some() {
|
||||
PacketStatisticsEvent::RealPacketSent(packet_size)
|
||||
} else {
|
||||
PacketStatisticsEvent::CoverPacketSent(packet_size)
|
||||
};
|
||||
self.stats_tx.report(event.into());
|
||||
}
|
||||
} else {
|
||||
let event = if fragment_id.is_some() {
|
||||
PacketStatisticsEvent::RealPacketSent(packet_size)
|
||||
} else {
|
||||
PacketStatisticsEvent::CoverPacketSent(packet_size)
|
||||
};
|
||||
self.stats_tx.report(event.into());
|
||||
}
|
||||
|
||||
// notify ack controller about sending our message only after we actually managed to push it
|
||||
|
||||
@@ -201,7 +201,7 @@ impl<C, St> GatewayClient<C, St> {
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub async fn establish_connection(&mut self) -> Result<(), GatewayClientError> {
|
||||
debug!(
|
||||
"Attemting to establish connection to gateway at: {}",
|
||||
"Attempting to establish connection to gateway at: {}",
|
||||
self.gateway_address
|
||||
);
|
||||
let (ws_stream, _) = connect_async(
|
||||
|
||||
@@ -337,7 +337,7 @@ impl PartiallyDelegatedHandle {
|
||||
// check if the split stream didn't error out
|
||||
let receive_res = stream_receiver
|
||||
.try_recv()
|
||||
.expect("stream sender was somehow dropped without sending anything!");
|
||||
.map_err(|_| GatewayClientError::ConnectionAbruptlyClosed)?;
|
||||
|
||||
if let Some(res) = receive_res {
|
||||
let _res = res?;
|
||||
|
||||
+30
-2
@@ -665,8 +665,7 @@ where
|
||||
CosmWasmClient::broadcast_tx_commit(self, tx_bytes).await
|
||||
}
|
||||
|
||||
/// Broadcast a transaction to the network and monitors its inclusion in a block.
|
||||
async fn sign_and_broadcast(
|
||||
async fn sign_and_broadcast_inner(
|
||||
&self,
|
||||
signer_address: &AccountId,
|
||||
messages: Vec<Any>,
|
||||
@@ -686,6 +685,35 @@ where
|
||||
self.broadcast_tx(tx_bytes, None, None).await
|
||||
}
|
||||
|
||||
/// Broadcast a transaction to the network and monitors its inclusion in a block.
|
||||
async fn sign_and_broadcast(
|
||||
&self,
|
||||
signer_address: &AccountId,
|
||||
messages: Vec<Any>,
|
||||
fee: Fee,
|
||||
memo: impl Into<String> + Send + 'static,
|
||||
) -> Result<TxResponse, NyxdError> {
|
||||
let memo = memo.into();
|
||||
loop {
|
||||
match self
|
||||
.sign_and_broadcast_inner(
|
||||
signer_address,
|
||||
messages.clone(),
|
||||
fee.clone(),
|
||||
memo.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(res) => return Ok(res),
|
||||
Err(err) => {
|
||||
if !err.to_string().contains("sequence") {
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn sign(
|
||||
&self,
|
||||
signer_address: &AccountId,
|
||||
|
||||
@@ -10,6 +10,7 @@ where
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
#[track_caller]
|
||||
pub fn spawn<F>(future: F)
|
||||
where
|
||||
F: Future + Send + 'static,
|
||||
@@ -18,6 +19,7 @@ where
|
||||
tokio::spawn(future);
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
pub fn spawn_with_report_error<F, T, E>(future: F, mut shutdown: TaskClient)
|
||||
where
|
||||
F: Future<Output = Result<T, E>> + Send + 'static,
|
||||
|
||||
+1
-1
@@ -3,7 +3,7 @@
|
||||
|
||||
[package]
|
||||
name = "nym-node"
|
||||
version = "1.15.0"
|
||||
version = "1.15.1"
|
||||
authors.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
|
||||
Reference in New Issue
Block a user