Compare commits

...

1 Commits

Author SHA1 Message Date
Jon Häggblad 21f1fa94de wip 2025-03-21 17:23:56 +01:00
2 changed files with 41 additions and 9 deletions
+8
View File
@@ -23,12 +23,14 @@ const LENGTH_PREFIX_SIZE: usize = 2;
// long for the buffer to fill up, since this kills latency.
pub struct MultiIpPacketCodec {
buffer: BytesMut,
pub counter: u64,
}
impl MultiIpPacketCodec {
pub fn new() -> Self {
MultiIpPacketCodec {
buffer: BytesMut::new(),
counter: 0,
}
}
@@ -89,6 +91,8 @@ impl Encoder<IprPacket> for MultiIpPacketCodec {
let packet = match packet {
IprPacket::Flush => {
dst.extend_from_slice(&self.buffer);
self.counter += 1;
println!("Encoding packet: {}", self.counter);
self.buffer = BytesMut::new();
return Ok(());
}
@@ -103,6 +107,8 @@ impl Encoder<IprPacket> for MultiIpPacketCodec {
dst.extend_from_slice(&(packet_size as u16).to_be_bytes());
// Add the packet to the buffer
dst.extend_from_slice(&packet);
self.counter += 1;
println!("Encoding packet: {}", self.counter);
return Ok(());
}
@@ -111,6 +117,8 @@ impl Encoder<IprPacket> for MultiIpPacketCodec {
if self.buffer.len() + packet_size + LENGTH_PREFIX_SIZE > MAX_PACKET_SIZE {
// Send the existing buffer
dst.extend_from_slice(&self.buffer);
self.counter += 1;
println!("Encoding packet: {}", self.counter);
// Start a new buffer
self.buffer = BytesMut::new();
}
+33 -9
View File
@@ -93,6 +93,9 @@ where
// The handle for the mixnet sender task
send_task: JoinHandle<()>,
poll_counter: u64,
counter: u64,
}
impl MixnetMessageSink<DefaultMixnetMessageSinkTranslator> {
@@ -171,6 +174,8 @@ where
message_translator,
tx,
send_task,
poll_counter: 0,
counter: 0,
}
}
@@ -183,7 +188,10 @@ where
let (tx, mut rx) = mpsc::channel(SINK_BUFFER_SIZE_IN_MESSAGES);
let send_task = tokio::spawn(async move {
let mut counter = 0u64;
while let Some(input_message) = rx.recv().await {
counter += 1;
println!("sink sender counter: {}", counter);
if let Err(err) = mixnet_client_sender.send(input_message).await {
log::error!("failed to send packet to mixnet: {err}");
}
@@ -212,20 +220,33 @@ where
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::result::Result<usize, std::io::Error>> {
ready!(self.tx.poll_ready_unpin(cx)).map_err(|_| {
std::io::Error::new(std::io::ErrorKind::Other, "failed to send packet to mixnet")
})?;
self.poll_counter += 1;
println!("poll_write counter: {}", self.poll_counter);
ready!(self.tx.poll_ready_unpin(cx))
.map_err(|_| {
std::io::Error::new(std::io::ErrorKind::Other, "failed to send packet to mixnet")
})
.inspect_err(|err| println!("failed to poll_ready_unpin: {err}"))?;
self.counter += 1;
println!("poll_write success counter: {}", self.counter);
let input_message = self
.message_translator
.to_input_message(buf)
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?;
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
.inspect_err(|err| println!("failed to convert bytes to input message: {err}"))?;
// Pass it to the mixnet sender
self.tx.start_send_unpin(input_message).map_err(|_| {
std::io::Error::new(std::io::ErrorKind::Other, "failed to send packet to mixnet")
})?;
self.tx
.start_send_unpin(input_message)
.map_err(|_| {
std::io::Error::new(std::io::ErrorKind::Other, "failed to send packet to mixnet")
})
.inspect_err(|err| println!("failed to start_send_unpin: {err}"))?;
println!("poll_write success");
Poll::Ready(Ok(buf.len()))
}
@@ -240,9 +261,12 @@ where
}
fn poll_shutdown(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<std::result::Result<(), std::io::Error>> {
self.poll_flush(cx)
// self.poll_flush(cx)
ready!(self.tx.poll_close_unpin(cx))
.map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "failed to close"))?;
Poll::Ready(Ok(()))
}
}