Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f17795d8cc | |||
| dda337a8a1 | |||
| 7f1e19ebe0 | |||
| 81f6c1b52b | |||
| b367432963 | |||
| 6cda24a7a9 |
@@ -21,6 +21,7 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
|
||||
### Fixed
|
||||
|
||||
- validator-api, mixnode, gateway should now prefer values in config.toml over mainnet defaults ([#1645])
|
||||
- socks5-client: fix bug where in some cases packet reordering could trigger a connection being closed too early ([#1702])
|
||||
|
||||
### Changed
|
||||
|
||||
@@ -43,6 +44,7 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
|
||||
[#1671]: https://github.com/nymtech/nym/pull/1671
|
||||
[#1673]: https://github.com/nymtech/nym/pull/1673
|
||||
[#1687]: https://github.com/nymtech/nym/pull/1687
|
||||
[#1702]: https://github.com/nymtech/nym/pull/1702
|
||||
|
||||
|
||||
## [nym-binaries-1.0.2](https://github.com/nymtech/nym/tree/nym-binaries-1.0.2)
|
||||
|
||||
@@ -13,6 +13,13 @@ pub struct OrderedMessageBuffer {
|
||||
messages: HashMap<u64, OrderedMessage>,
|
||||
}
|
||||
|
||||
/// Data returned from `OrderedMessageBuffer` on a successful read of gapless ordered data.
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct ReadContiguousData {
|
||||
pub data: Vec<u8>,
|
||||
pub last_index: u64,
|
||||
}
|
||||
|
||||
impl OrderedMessageBuffer {
|
||||
pub fn new() -> OrderedMessageBuffer {
|
||||
OrderedMessageBuffer {
|
||||
@@ -42,7 +49,7 @@ impl OrderedMessageBuffer {
|
||||
/// a read will return the bytes of messages 0, 1, 2. Subsequent reads will
|
||||
/// return `None` until message 3 comes in, at which point 3, 4, and any
|
||||
/// further contiguous messages which have arrived will be returned.
|
||||
pub fn read(&mut self) -> Option<Vec<u8>> {
|
||||
pub fn read(&mut self) -> Option<ReadContiguousData> {
|
||||
if !self.messages.contains_key(&self.next_index) {
|
||||
return None;
|
||||
}
|
||||
@@ -66,7 +73,10 @@ impl OrderedMessageBuffer {
|
||||
.collect();
|
||||
|
||||
trace!("Returning {} bytes from ordered message buffer", data.len());
|
||||
Some(data)
|
||||
Some(ReadContiguousData {
|
||||
data,
|
||||
last_index: index,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -102,11 +112,11 @@ mod test_chunking_and_reassembling {
|
||||
};
|
||||
|
||||
buffer.write(first_message);
|
||||
let first_read = buffer.read().unwrap();
|
||||
let first_read = buffer.read().unwrap().data;
|
||||
assert_eq!(vec![1, 2, 3, 4], first_read);
|
||||
|
||||
buffer.write(second_message);
|
||||
let second_read = buffer.read().unwrap();
|
||||
let second_read = buffer.read().unwrap().data;
|
||||
assert_eq!(vec![5, 6, 7, 8], second_read);
|
||||
|
||||
assert_eq!(None, buffer.read()); // second read on fully ordered result set is empty
|
||||
@@ -128,7 +138,7 @@ mod test_chunking_and_reassembling {
|
||||
buffer.write(first_message);
|
||||
buffer.write(second_message);
|
||||
let second_read = buffer.read();
|
||||
assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], second_read.unwrap());
|
||||
assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], second_read.unwrap().data);
|
||||
assert_eq!(None, buffer.read()); // second read on fully ordered result set is empty
|
||||
}
|
||||
|
||||
@@ -147,8 +157,8 @@ mod test_chunking_and_reassembling {
|
||||
|
||||
buffer.write(second_message);
|
||||
buffer.write(first_message);
|
||||
let read = buffer.read();
|
||||
assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], read.unwrap());
|
||||
let read = buffer.read().unwrap().data;
|
||||
assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], read);
|
||||
assert_eq!(None, buffer.read()); // second read on fully ordered result set is empty
|
||||
}
|
||||
}
|
||||
@@ -182,7 +192,7 @@ mod test_chunking_and_reassembling {
|
||||
#[test]
|
||||
fn everything_up_to_the_indexing_gap_is_returned() {
|
||||
let mut buffer = setup();
|
||||
let ordered_bytes = buffer.read().unwrap();
|
||||
let ordered_bytes = buffer.read().unwrap().data;
|
||||
assert_eq!([0, 0, 0, 0, 1, 1, 1, 1].to_vec(), ordered_bytes);
|
||||
|
||||
// we shouldn't get any more from a second attempt if nothing is added
|
||||
@@ -208,7 +218,7 @@ mod test_chunking_and_reassembling {
|
||||
};
|
||||
buffer.write(two_message);
|
||||
|
||||
let more_ordered_bytes = buffer.read().unwrap();
|
||||
let more_ordered_bytes = buffer.read().unwrap().data;
|
||||
assert_eq!([2, 2, 2, 2, 3, 3, 3, 3].to_vec(), more_ordered_bytes);
|
||||
|
||||
// let's add another message
|
||||
@@ -227,7 +237,10 @@ mod test_chunking_and_reassembling {
|
||||
};
|
||||
buffer.write(four_message);
|
||||
|
||||
assert_eq!([4, 4, 4, 4, 5, 5, 5, 5].to_vec(), buffer.read().unwrap());
|
||||
assert_eq!(
|
||||
[4, 4, 4, 4, 5, 5, 5, 5].to_vec(),
|
||||
buffer.read().unwrap().data
|
||||
);
|
||||
|
||||
// at this point we should again get back nothing if we try a read
|
||||
assert_eq!(None, buffer.read());
|
||||
|
||||
@@ -2,7 +2,7 @@ mod buffer;
|
||||
mod message;
|
||||
mod sender;
|
||||
|
||||
pub use buffer::OrderedMessageBuffer;
|
||||
pub use buffer::{OrderedMessageBuffer, ReadContiguousData};
|
||||
pub use message::MessageError;
|
||||
pub use message::OrderedMessage;
|
||||
pub use sender::OrderedMessageSender;
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
use futures::channel::mpsc;
|
||||
use futures::StreamExt;
|
||||
use log::*;
|
||||
use ordered_buffer::{OrderedMessage, OrderedMessageBuffer};
|
||||
use ordered_buffer::{OrderedMessage, OrderedMessageBuffer, ReadContiguousData};
|
||||
use socks5_requests::ConnectionId;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use task::ShutdownListener;
|
||||
@@ -38,12 +38,13 @@ pub enum ControllerCommand {
|
||||
|
||||
struct ActiveConnection {
|
||||
is_closed: bool,
|
||||
closed_at_index: Option<u64>,
|
||||
connection_sender: Option<ConnectionSender>,
|
||||
ordered_buffer: OrderedMessageBuffer,
|
||||
}
|
||||
|
||||
impl ActiveConnection {
|
||||
fn write_to_buf(&mut self, payload: Vec<u8>) {
|
||||
fn write_to_buf(&mut self, payload: Vec<u8>, is_closed: bool) {
|
||||
let ordered_message = match OrderedMessage::try_from_bytes(payload) {
|
||||
Ok(msg) => msg,
|
||||
Err(err) => {
|
||||
@@ -51,10 +52,13 @@ impl ActiveConnection {
|
||||
return;
|
||||
}
|
||||
};
|
||||
if is_closed {
|
||||
self.closed_at_index = Some(ordered_message.index);
|
||||
}
|
||||
self.ordered_buffer.write(ordered_message);
|
||||
}
|
||||
|
||||
fn read_from_buf(&mut self) -> Option<Vec<u8>> {
|
||||
fn read_from_buf(&mut self) -> Option<ReadContiguousData> {
|
||||
self.ordered_buffer.read()
|
||||
}
|
||||
}
|
||||
@@ -99,6 +103,7 @@ impl Controller {
|
||||
is_closed: false,
|
||||
connection_sender: Some(connection_sender),
|
||||
ordered_buffer: OrderedMessageBuffer::new(),
|
||||
closed_at_index: None,
|
||||
};
|
||||
if let Some(_active_conn) = self.active_connections.insert(conn_id, active_connection) {
|
||||
error!("Received a duplicate 'Connect'!")
|
||||
@@ -127,21 +132,23 @@ impl Controller {
|
||||
fn send_to_connection(&mut self, conn_id: ConnectionId, payload: Vec<u8>, is_closed: bool) {
|
||||
if let Some(active_connection) = self.active_connections.get_mut(&conn_id) {
|
||||
if !payload.is_empty() {
|
||||
active_connection.write_to_buf(payload);
|
||||
active_connection.write_to_buf(payload, is_closed);
|
||||
} else if !is_closed {
|
||||
error!("Tried to write an empty message to a not-closing connection. Please let us know if you see this message");
|
||||
}
|
||||
// if messages get unordered, make sure we don't lose information about
|
||||
// remote socket getting closed!
|
||||
active_connection.is_closed |= is_closed;
|
||||
|
||||
if let Some(payload) = active_connection.read_from_buf() {
|
||||
if let Some(closed_at_index) = active_connection.closed_at_index {
|
||||
if payload.last_index > closed_at_index {
|
||||
active_connection.is_closed = true;
|
||||
}
|
||||
}
|
||||
if let Err(err) = active_connection
|
||||
.connection_sender
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.unbounded_send(ConnectionMessage {
|
||||
payload,
|
||||
payload: payload.data,
|
||||
socket_closed: active_connection.is_closed,
|
||||
})
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user