appease clippy
This commit is contained in:
@@ -1,8 +1,8 @@
|
||||
//! Mix Packet Ingest Worker implementation and tooling
|
||||
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Instant;
|
||||
use std::{io, num::NonZero};
|
||||
|
||||
use crate::node::{
|
||||
key_rotation::active_keys::SphinxKeyGuard,
|
||||
@@ -75,13 +75,17 @@ impl MixPacketIngest {
|
||||
}
|
||||
|
||||
pub async fn run(&mut self, shutdown_token: ShutdownToken) {
|
||||
// this one is impossible to ever panic
|
||||
#[allow(clippy::unwrap_used)]
|
||||
let default_worker_count = NonZero::new(1).unwrap();
|
||||
let num_threads = std::thread::available_parallelism()
|
||||
.expect("unable to query available parallelism")
|
||||
.inspect_err(|e| warn!("unable to query available parallelism: {e}"))
|
||||
.unwrap_or(default_worker_count)
|
||||
.get();
|
||||
|
||||
let mut workers = tokio::task::JoinSet::new();
|
||||
|
||||
for i in 0..num_threads {
|
||||
for _ in 0..num_threads {
|
||||
let recv = self.packet_receiver.clone();
|
||||
let worker = MixPacketIngestWorker {
|
||||
packet_receiver: recv,
|
||||
@@ -132,7 +136,7 @@ impl MixPacketIngestWorker {
|
||||
// this one is impossible to ever panic - the parent struct maintains a sender
|
||||
// and waits for this process to end. Therefore it can't happen that ALL senders
|
||||
// are dropped
|
||||
#[allow(clippy::unwrap_used)]
|
||||
#[allow(clippy::expect_used)]
|
||||
let new_packet = new_packet.expect("the ingest receiver closed somehow");
|
||||
self.handle_ingest_packet(new_packet).await;
|
||||
}
|
||||
@@ -155,7 +159,7 @@ impl MixPacketIngestWorker {
|
||||
}
|
||||
|
||||
async fn handle_received_packet_with_replay_detection(&mut self, packet: IngressNymPacket) {
|
||||
let source = packet.received_from.clone();
|
||||
let source = packet.received_from;
|
||||
let received_at = packet.received_at;
|
||||
|
||||
// 1. derive and expand shared secret
|
||||
@@ -188,7 +192,7 @@ impl MixPacketIngestWorker {
|
||||
let Ok(replayed) = self
|
||||
.shared
|
||||
.replay_protection_filter
|
||||
.check_and_set(rotation_id, &replay_tag)
|
||||
.check_and_set(rotation_id, replay_tag)
|
||||
else {
|
||||
// our mutex got poisoned - we have to shut down
|
||||
error!("CRITICAL FAILURE: replay bloomfilter mutex poisoning!");
|
||||
@@ -214,7 +218,7 @@ impl MixPacketIngestWorker {
|
||||
}
|
||||
|
||||
async fn handle_received_packet_with_no_replay_detection(&mut self, packet: IngressNymPacket) {
|
||||
let source = packet.received_from.clone();
|
||||
let source = packet.received_from;
|
||||
let received_at = packet.received_at;
|
||||
let unwrapped_packet = self.try_full_unwrap_packet(packet);
|
||||
self.handle_unwrapped_packet(unwrapped_packet, source, received_at)
|
||||
@@ -413,8 +417,7 @@ impl MixPacketIngestWorker {
|
||||
target.saturating_duration_since(received_at).as_millis() as u64,
|
||||
);
|
||||
}
|
||||
self.shared
|
||||
.forward_mix_packet(mix_packet, forward_instant.into());
|
||||
self.shared.forward_mix_packet(mix_packet, forward_instant);
|
||||
}
|
||||
|
||||
/// Determine instant at which packet should get forwarded to the next hop.
|
||||
|
||||
Reference in New Issue
Block a user