Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f17795d8cc | |||
| dda337a8a1 | |||
| 7f1e19ebe0 | |||
| 81f6c1b52b | |||
| b367432963 | |||
| 6cda24a7a9 |
@@ -21,6 +21,7 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
|
|||||||
### Fixed
|
### Fixed
|
||||||
|
|
||||||
- validator-api, mixnode, gateway should now prefer values in config.toml over mainnet defaults ([#1645])
|
- validator-api, mixnode, gateway should now prefer values in config.toml over mainnet defaults ([#1645])
|
||||||
|
- socks5-client: fix bug where in some cases packet reordering could trigger a connection being closed too early ([#1702])
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
|
|
||||||
@@ -43,6 +44,7 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
|
|||||||
[#1671]: https://github.com/nymtech/nym/pull/1671
|
[#1671]: https://github.com/nymtech/nym/pull/1671
|
||||||
[#1673]: https://github.com/nymtech/nym/pull/1673
|
[#1673]: https://github.com/nymtech/nym/pull/1673
|
||||||
[#1687]: https://github.com/nymtech/nym/pull/1687
|
[#1687]: https://github.com/nymtech/nym/pull/1687
|
||||||
|
[#1702]: https://github.com/nymtech/nym/pull/1702
|
||||||
|
|
||||||
|
|
||||||
## [nym-binaries-1.0.2](https://github.com/nymtech/nym/tree/nym-binaries-1.0.2)
|
## [nym-binaries-1.0.2](https://github.com/nymtech/nym/tree/nym-binaries-1.0.2)
|
||||||
|
|||||||
@@ -13,6 +13,13 @@ pub struct OrderedMessageBuffer {
|
|||||||
messages: HashMap<u64, OrderedMessage>,
|
messages: HashMap<u64, OrderedMessage>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Data returned from `OrderedMessageBuffer` on a successful read of gapless ordered data.
|
||||||
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
|
pub struct ReadContiguousData {
|
||||||
|
pub data: Vec<u8>,
|
||||||
|
pub last_index: u64,
|
||||||
|
}
|
||||||
|
|
||||||
impl OrderedMessageBuffer {
|
impl OrderedMessageBuffer {
|
||||||
pub fn new() -> OrderedMessageBuffer {
|
pub fn new() -> OrderedMessageBuffer {
|
||||||
OrderedMessageBuffer {
|
OrderedMessageBuffer {
|
||||||
@@ -42,7 +49,7 @@ impl OrderedMessageBuffer {
|
|||||||
/// a read will return the bytes of messages 0, 1, 2. Subsequent reads will
|
/// a read will return the bytes of messages 0, 1, 2. Subsequent reads will
|
||||||
/// return `None` until message 3 comes in, at which point 3, 4, and any
|
/// return `None` until message 3 comes in, at which point 3, 4, and any
|
||||||
/// further contiguous messages which have arrived will be returned.
|
/// further contiguous messages which have arrived will be returned.
|
||||||
pub fn read(&mut self) -> Option<Vec<u8>> {
|
pub fn read(&mut self) -> Option<ReadContiguousData> {
|
||||||
if !self.messages.contains_key(&self.next_index) {
|
if !self.messages.contains_key(&self.next_index) {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
@@ -66,7 +73,10 @@ impl OrderedMessageBuffer {
|
|||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
trace!("Returning {} bytes from ordered message buffer", data.len());
|
trace!("Returning {} bytes from ordered message buffer", data.len());
|
||||||
Some(data)
|
Some(ReadContiguousData {
|
||||||
|
data,
|
||||||
|
last_index: index,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -102,11 +112,11 @@ mod test_chunking_and_reassembling {
|
|||||||
};
|
};
|
||||||
|
|
||||||
buffer.write(first_message);
|
buffer.write(first_message);
|
||||||
let first_read = buffer.read().unwrap();
|
let first_read = buffer.read().unwrap().data;
|
||||||
assert_eq!(vec![1, 2, 3, 4], first_read);
|
assert_eq!(vec![1, 2, 3, 4], first_read);
|
||||||
|
|
||||||
buffer.write(second_message);
|
buffer.write(second_message);
|
||||||
let second_read = buffer.read().unwrap();
|
let second_read = buffer.read().unwrap().data;
|
||||||
assert_eq!(vec![5, 6, 7, 8], second_read);
|
assert_eq!(vec![5, 6, 7, 8], second_read);
|
||||||
|
|
||||||
assert_eq!(None, buffer.read()); // second read on fully ordered result set is empty
|
assert_eq!(None, buffer.read()); // second read on fully ordered result set is empty
|
||||||
@@ -128,7 +138,7 @@ mod test_chunking_and_reassembling {
|
|||||||
buffer.write(first_message);
|
buffer.write(first_message);
|
||||||
buffer.write(second_message);
|
buffer.write(second_message);
|
||||||
let second_read = buffer.read();
|
let second_read = buffer.read();
|
||||||
assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], second_read.unwrap());
|
assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], second_read.unwrap().data);
|
||||||
assert_eq!(None, buffer.read()); // second read on fully ordered result set is empty
|
assert_eq!(None, buffer.read()); // second read on fully ordered result set is empty
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -147,8 +157,8 @@ mod test_chunking_and_reassembling {
|
|||||||
|
|
||||||
buffer.write(second_message);
|
buffer.write(second_message);
|
||||||
buffer.write(first_message);
|
buffer.write(first_message);
|
||||||
let read = buffer.read();
|
let read = buffer.read().unwrap().data;
|
||||||
assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], read.unwrap());
|
assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], read);
|
||||||
assert_eq!(None, buffer.read()); // second read on fully ordered result set is empty
|
assert_eq!(None, buffer.read()); // second read on fully ordered result set is empty
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -182,7 +192,7 @@ mod test_chunking_and_reassembling {
|
|||||||
#[test]
|
#[test]
|
||||||
fn everything_up_to_the_indexing_gap_is_returned() {
|
fn everything_up_to_the_indexing_gap_is_returned() {
|
||||||
let mut buffer = setup();
|
let mut buffer = setup();
|
||||||
let ordered_bytes = buffer.read().unwrap();
|
let ordered_bytes = buffer.read().unwrap().data;
|
||||||
assert_eq!([0, 0, 0, 0, 1, 1, 1, 1].to_vec(), ordered_bytes);
|
assert_eq!([0, 0, 0, 0, 1, 1, 1, 1].to_vec(), ordered_bytes);
|
||||||
|
|
||||||
// we shouldn't get any more from a second attempt if nothing is added
|
// we shouldn't get any more from a second attempt if nothing is added
|
||||||
@@ -208,7 +218,7 @@ mod test_chunking_and_reassembling {
|
|||||||
};
|
};
|
||||||
buffer.write(two_message);
|
buffer.write(two_message);
|
||||||
|
|
||||||
let more_ordered_bytes = buffer.read().unwrap();
|
let more_ordered_bytes = buffer.read().unwrap().data;
|
||||||
assert_eq!([2, 2, 2, 2, 3, 3, 3, 3].to_vec(), more_ordered_bytes);
|
assert_eq!([2, 2, 2, 2, 3, 3, 3, 3].to_vec(), more_ordered_bytes);
|
||||||
|
|
||||||
// let's add another message
|
// let's add another message
|
||||||
@@ -227,7 +237,10 @@ mod test_chunking_and_reassembling {
|
|||||||
};
|
};
|
||||||
buffer.write(four_message);
|
buffer.write(four_message);
|
||||||
|
|
||||||
assert_eq!([4, 4, 4, 4, 5, 5, 5, 5].to_vec(), buffer.read().unwrap());
|
assert_eq!(
|
||||||
|
[4, 4, 4, 4, 5, 5, 5, 5].to_vec(),
|
||||||
|
buffer.read().unwrap().data
|
||||||
|
);
|
||||||
|
|
||||||
// at this point we should again get back nothing if we try a read
|
// at this point we should again get back nothing if we try a read
|
||||||
assert_eq!(None, buffer.read());
|
assert_eq!(None, buffer.read());
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ mod buffer;
|
|||||||
mod message;
|
mod message;
|
||||||
mod sender;
|
mod sender;
|
||||||
|
|
||||||
pub use buffer::OrderedMessageBuffer;
|
pub use buffer::{OrderedMessageBuffer, ReadContiguousData};
|
||||||
pub use message::MessageError;
|
pub use message::MessageError;
|
||||||
pub use message::OrderedMessage;
|
pub use message::OrderedMessage;
|
||||||
pub use sender::OrderedMessageSender;
|
pub use sender::OrderedMessageSender;
|
||||||
|
|||||||
@@ -4,7 +4,7 @@
|
|||||||
use futures::channel::mpsc;
|
use futures::channel::mpsc;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use log::*;
|
use log::*;
|
||||||
use ordered_buffer::{OrderedMessage, OrderedMessageBuffer};
|
use ordered_buffer::{OrderedMessage, OrderedMessageBuffer, ReadContiguousData};
|
||||||
use socks5_requests::ConnectionId;
|
use socks5_requests::ConnectionId;
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use task::ShutdownListener;
|
use task::ShutdownListener;
|
||||||
@@ -38,12 +38,13 @@ pub enum ControllerCommand {
|
|||||||
|
|
||||||
struct ActiveConnection {
|
struct ActiveConnection {
|
||||||
is_closed: bool,
|
is_closed: bool,
|
||||||
|
closed_at_index: Option<u64>,
|
||||||
connection_sender: Option<ConnectionSender>,
|
connection_sender: Option<ConnectionSender>,
|
||||||
ordered_buffer: OrderedMessageBuffer,
|
ordered_buffer: OrderedMessageBuffer,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ActiveConnection {
|
impl ActiveConnection {
|
||||||
fn write_to_buf(&mut self, payload: Vec<u8>) {
|
fn write_to_buf(&mut self, payload: Vec<u8>, is_closed: bool) {
|
||||||
let ordered_message = match OrderedMessage::try_from_bytes(payload) {
|
let ordered_message = match OrderedMessage::try_from_bytes(payload) {
|
||||||
Ok(msg) => msg,
|
Ok(msg) => msg,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
@@ -51,10 +52,13 @@ impl ActiveConnection {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
if is_closed {
|
||||||
|
self.closed_at_index = Some(ordered_message.index);
|
||||||
|
}
|
||||||
self.ordered_buffer.write(ordered_message);
|
self.ordered_buffer.write(ordered_message);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_from_buf(&mut self) -> Option<Vec<u8>> {
|
fn read_from_buf(&mut self) -> Option<ReadContiguousData> {
|
||||||
self.ordered_buffer.read()
|
self.ordered_buffer.read()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -99,6 +103,7 @@ impl Controller {
|
|||||||
is_closed: false,
|
is_closed: false,
|
||||||
connection_sender: Some(connection_sender),
|
connection_sender: Some(connection_sender),
|
||||||
ordered_buffer: OrderedMessageBuffer::new(),
|
ordered_buffer: OrderedMessageBuffer::new(),
|
||||||
|
closed_at_index: None,
|
||||||
};
|
};
|
||||||
if let Some(_active_conn) = self.active_connections.insert(conn_id, active_connection) {
|
if let Some(_active_conn) = self.active_connections.insert(conn_id, active_connection) {
|
||||||
error!("Received a duplicate 'Connect'!")
|
error!("Received a duplicate 'Connect'!")
|
||||||
@@ -127,21 +132,23 @@ impl Controller {
|
|||||||
fn send_to_connection(&mut self, conn_id: ConnectionId, payload: Vec<u8>, is_closed: bool) {
|
fn send_to_connection(&mut self, conn_id: ConnectionId, payload: Vec<u8>, is_closed: bool) {
|
||||||
if let Some(active_connection) = self.active_connections.get_mut(&conn_id) {
|
if let Some(active_connection) = self.active_connections.get_mut(&conn_id) {
|
||||||
if !payload.is_empty() {
|
if !payload.is_empty() {
|
||||||
active_connection.write_to_buf(payload);
|
active_connection.write_to_buf(payload, is_closed);
|
||||||
} else if !is_closed {
|
} else if !is_closed {
|
||||||
error!("Tried to write an empty message to a not-closing connection. Please let us know if you see this message");
|
error!("Tried to write an empty message to a not-closing connection. Please let us know if you see this message");
|
||||||
}
|
}
|
||||||
// if messages get unordered, make sure we don't lose information about
|
|
||||||
// remote socket getting closed!
|
|
||||||
active_connection.is_closed |= is_closed;
|
|
||||||
|
|
||||||
if let Some(payload) = active_connection.read_from_buf() {
|
if let Some(payload) = active_connection.read_from_buf() {
|
||||||
|
if let Some(closed_at_index) = active_connection.closed_at_index {
|
||||||
|
if payload.last_index > closed_at_index {
|
||||||
|
active_connection.is_closed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
if let Err(err) = active_connection
|
if let Err(err) = active_connection
|
||||||
.connection_sender
|
.connection_sender
|
||||||
.as_mut()
|
.as_mut()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.unbounded_send(ConnectionMessage {
|
.unbounded_send(ConnectionMessage {
|
||||||
payload,
|
payload: payload.data,
|
||||||
socket_closed: active_connection.is_closed,
|
socket_closed: active_connection.is_closed,
|
||||||
})
|
})
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user