Merge branch 'staging' into grim
# Conflicts: # store/src/lmdb.rs
This commit is contained in:
@@ -18,6 +18,7 @@ use grin_p2p as p2p;
|
||||
use grin_util as util;
|
||||
use grin_util::StopState;
|
||||
|
||||
use std::fs;
|
||||
use std::net::{SocketAddr, TcpListener, TcpStream};
|
||||
use std::sync::Arc;
|
||||
use std::{thread, time};
|
||||
@@ -44,11 +45,17 @@ fn test_setup() {
|
||||
util::init_test_logger();
|
||||
}
|
||||
|
||||
fn clean_output_dir(dir_name: &str) {
|
||||
let _ = fs::remove_dir_all(dir_name);
|
||||
}
|
||||
|
||||
// Starts a server and connects a client peer to it to check handshake,
|
||||
// followed by a ping/pong exchange to make sure the connection is live.
|
||||
#[test]
|
||||
fn peer_handshake() {
|
||||
test_setup();
|
||||
let test_dir = "target/peer_handshake";
|
||||
clean_output_dir(test_dir);
|
||||
|
||||
let p2p_config = p2p::P2PConfig {
|
||||
host: "127.0.0.1".parse().unwrap(),
|
||||
@@ -60,7 +67,7 @@ fn peer_handshake() {
|
||||
let net_adapter = Arc::new(p2p::DummyAdapter {});
|
||||
let server = Arc::new(
|
||||
p2p::Server::new(
|
||||
".grin",
|
||||
test_dir,
|
||||
p2p::Capabilities::UNKNOWN,
|
||||
p2p_config.clone(),
|
||||
net_adapter.clone(),
|
||||
|
||||
+60
-62
@@ -635,12 +635,9 @@ impl Store {
|
||||
Ok(read) => {
|
||||
let db_res = self.get_db(db_key);
|
||||
match db_res {
|
||||
Ok(db) => Ok(DatabaseIterator::new(
|
||||
self,
|
||||
Arc::new(db.clone()),
|
||||
read,
|
||||
deserialize,
|
||||
)),
|
||||
Ok(db) => {
|
||||
DatabaseIterator::new(self, Arc::new(db.clone()), read, deserialize)
|
||||
}
|
||||
Err(e) => Err(Error::from(e)),
|
||||
}
|
||||
}
|
||||
@@ -797,12 +794,12 @@ impl<'a> Batch<'a> {
|
||||
Ok(read) => {
|
||||
let db_res = self.store.get_db(db_key);
|
||||
match db_res {
|
||||
Ok(db) => Ok(DatabaseIterator::new(
|
||||
Ok(db) => DatabaseIterator::new(
|
||||
self.store,
|
||||
Arc::new(db.clone()),
|
||||
read,
|
||||
deserialize,
|
||||
)),
|
||||
),
|
||||
Err(e) => Err(Error::from(e)),
|
||||
}
|
||||
}
|
||||
@@ -879,9 +876,9 @@ where
|
||||
db: Arc<Database<Bytes, Bytes>>,
|
||||
read: Arc<RoTxn<'a, WithoutTls>>,
|
||||
keys: Vec<Vec<u8>>,
|
||||
total_keys: usize,
|
||||
skip_cur: usize,
|
||||
skip_total: usize,
|
||||
done: bool,
|
||||
deserialize: F,
|
||||
#[allow(dead_code)]
|
||||
tx_counter: TxCounter,
|
||||
@@ -894,44 +891,37 @@ where
|
||||
type Item = Result<T, Error>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if let Some(k) = self.keys.iter().skip(self.skip_cur).next() {
|
||||
self.skip_total += 1;
|
||||
self.skip_cur += 1;
|
||||
match self.db.get(&self.read, k) {
|
||||
Ok(v) => {
|
||||
if let Some(v) = v {
|
||||
return match (self.deserialize)(k, v) {
|
||||
Ok(v) => Some(Ok(v)),
|
||||
Err(e) => {
|
||||
error!("db iter: error deserializing: {}", e);
|
||||
Some(Err(Error::from(e)))
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
return {
|
||||
error!("db iter: error read value: {}", e);
|
||||
Some(Err(Error::from(e)))
|
||||
loop {
|
||||
if self.done {
|
||||
return None;
|
||||
} else if let Some(k) = self.keys.iter().skip(self.skip_cur).next() {
|
||||
self.skip_total += 1;
|
||||
self.skip_cur += 1;
|
||||
match self.db.get(&self.read, k) {
|
||||
Ok(v) => {
|
||||
if let Some(v) = v {
|
||||
return match (self.deserialize)(k, v) {
|
||||
Ok(v) => Some(Ok(v)),
|
||||
Err(e) => {
|
||||
error!("db iter: error deserializing: {}", e);
|
||||
Some(Err(Error::from(e)))
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
return {
|
||||
error!("db iter: error read value: {}", e);
|
||||
Some(Err(Error::from(e)))
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if let Err(e) = self.load_next_keys() {
|
||||
error!("db iter: error read keys: {}", e);
|
||||
self.done = true;
|
||||
return Some(Err(e));
|
||||
}
|
||||
} else if self.total_keys > self.skip_total {
|
||||
let keys = if let Ok(iter) = self.db.iter(&self.read) {
|
||||
iter.move_between_keys()
|
||||
.skip(self.skip_total)
|
||||
.take(10000)
|
||||
.filter(|kv| kv.is_ok())
|
||||
.map(|kv| kv.unwrap().0.to_vec())
|
||||
.collect::<Vec<Vec<u8>>>()
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
self.skip_cur = 0;
|
||||
self.keys = keys;
|
||||
return self.next();
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
@@ -945,34 +935,42 @@ where
|
||||
db: Arc<Database<Bytes, Bytes>>,
|
||||
read: RoTxn<'a, WithoutTls>,
|
||||
deserialize: F,
|
||||
) -> DatabaseIterator<'a, F, T> {
|
||||
let (keys, total_keys) = if let Ok(iter) = db.iter(&read) {
|
||||
let total = iter.move_between_keys().count();
|
||||
let keys = if let Ok(iter) = db.iter(&read) {
|
||||
iter.move_between_keys()
|
||||
.take(10000)
|
||||
.filter(|kv| kv.is_ok())
|
||||
.map(|kv| kv.unwrap().0.to_vec())
|
||||
.collect::<Vec<Vec<u8>>>()
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
(keys, total)
|
||||
} else {
|
||||
(vec![], 0)
|
||||
};
|
||||
DatabaseIterator {
|
||||
) -> Result<DatabaseIterator<'a, F, T>, Error> {
|
||||
// load keys before constructing tx_counter to avoid double-decrementing open_txs_count on error
|
||||
let keys = Self::read_key_page(&db, &read, 0)?;
|
||||
let done = keys.is_empty();
|
||||
Ok(DatabaseIterator {
|
||||
db,
|
||||
read: Arc::new(read),
|
||||
keys,
|
||||
total_keys,
|
||||
skip_cur: 0,
|
||||
skip_total: 0,
|
||||
done,
|
||||
deserialize,
|
||||
tx_counter: TxCounter {
|
||||
env_path: store.env_path.clone(),
|
||||
},
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn load_next_keys(&mut self) -> Result<(), Error> {
|
||||
self.keys = Self::read_key_page(&self.db, &self.read, self.skip_total)?;
|
||||
self.skip_cur = 0;
|
||||
self.done = self.keys.is_empty();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn read_key_page(
|
||||
db: &Database<Bytes, Bytes>,
|
||||
read: &RoTxn<'a, WithoutTls>,
|
||||
skip: usize,
|
||||
) -> Result<Vec<Vec<u8>>, Error> {
|
||||
let iter = db.iter(read)?;
|
||||
iter.move_between_keys()
|
||||
.skip(skip)
|
||||
.take(10000)
|
||||
.map(|kv| kv.map(|(k, _)| k.to_vec()).map_err(Error::from))
|
||||
.collect::<Result<Vec<Vec<u8>>, Error>>()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -140,6 +140,32 @@ fn test_iter() -> Result<(), store::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_iter_pages() -> Result<(), store::Error> {
|
||||
let test_dir = "target/test_iter_pages";
|
||||
setup(test_dir);
|
||||
|
||||
let prefix = b'P';
|
||||
let store = store::Store::new(test_dir, Some("test1"), None, vec![prefix], None, None)?;
|
||||
|
||||
{
|
||||
let mut batch = store.batch()?;
|
||||
for i in 0..10_001u32 {
|
||||
batch.put(Some(prefix), &i.to_be_bytes(), &[1])?;
|
||||
}
|
||||
batch.commit()?;
|
||||
}
|
||||
|
||||
let count = store
|
||||
.iter(Some(prefix), |_, v| Ok(v.to_vec()))?
|
||||
.collect::<Result<Vec<_>, _>>()?
|
||||
.len();
|
||||
assert_eq!(count, 10_001);
|
||||
|
||||
clean_output_dir(test_dir);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lmdb_allocate() -> Result<(), store::Error> {
|
||||
let test_dir = "target/lmdb_allocate";
|
||||
|
||||
Reference in New Issue
Block a user