Merge branch 'lmdb_update' into grim

This commit is contained in:
ardocrat
2026-04-27 22:28:14 +03:00
7 changed files with 328 additions and 266 deletions
+59 -53
View File
@@ -18,10 +18,11 @@ use crate::core::ser::{self, Readable, Reader, Writeable, Writer};
use crate::store::Batch;
use crate::types::CommitPos;
use crate::util::secp::pedersen::Commitment;
use byteorder::{BigEndian, WriteBytesExt};
use enum_primitive::FromPrimitive;
use grin_store as store;
use std::marker::PhantomData;
use store::{to_key, to_key_u64, Error};
use store::Error;
enum_from_primitive! {
#[derive(Copy, Clone, Debug, PartialEq)]
@@ -74,28 +75,24 @@ pub trait ListIndex {
/// List entry type
type Entry: ListIndexEntry;
/// Construct a key for the list.
fn list_key(&self, commit: Commitment) -> Vec<u8>;
/// Construct a key for an individual entry in the list.
fn entry_key(&self, commit: Commitment, pos: u64) -> Vec<u8>;
fn entry_key(&self, commit: Commitment, pos: u64) -> (Option<u8>, Vec<u8>);
/// Returns either a "Single" with embedded "pos" or a "list" with "head" and "tail".
/// Key is "prefix|commit".
/// Note the key for an individual entry in the list is "prefix|commit|pos".
fn get_list(&self, batch: &Batch<'_>, commit: Commitment) -> Result<Option<Self::List>, Error> {
batch.db.get_ser(&self.list_key(commit), None)
}
/// Key is "commit".
/// Note the key for an individual entry in the list is "commit|pos".
fn get_list(&self, batch: &Batch<'_>, commit: Commitment) -> Result<Option<Self::List>, Error>;
/// Returns one of "head", "tail" or "middle" entry variants.
/// Key is "prefix|commit|pos".
/// Key is "commit|pos".
fn get_entry(
&self,
batch: &Batch<'_>,
commit: Commitment,
pos: u64,
) -> Result<Option<Self::Entry>, Error> {
batch.db.get_ser(&self.entry_key(commit, pos), None)
let (db_key, key) = self.entry_key(commit, pos);
batch.db.get_ser(db_key, &key, None)
}
/// Peek the head of the list for the specified commitment.
@@ -243,12 +240,17 @@ where
type List = ListWrapper<T>;
type Entry = ListEntry<T>;
fn list_key(&self, commit: Commitment) -> Vec<u8> {
to_key(self.list_prefix, &mut commit.as_ref().to_vec())
fn entry_key(&self, commit: Commitment, pos: u64) -> (Option<u8>, Vec<u8>) {
let mut key = commit.as_ref().to_vec();
key.write_u64::<BigEndian>(pos).unwrap();
(Some(self.entry_prefix), key)
}
fn entry_key(&self, commit: Commitment, pos: u64) -> Vec<u8> {
to_key_u64(self.entry_prefix, &mut commit.as_ref().to_vec(), pos)
fn get_list(&self, batch: &Batch<'_>, commit: Commitment) -> Result<Option<Self::List>, Error> {
let list_key = (Some(self.list_prefix), commit.as_ref());
batch
.db
.get_ser::<ListWrapper<T>>(list_key.0, list_key.1, None)
}
fn peek_pos(&self, batch: &Batch<'_>, commit: Commitment) -> Result<Option<T>, Error> {
@@ -266,10 +268,11 @@ where
}
fn push_pos(&self, batch: &mut Batch<'_>, commit: Commitment, new_pos: T) -> Result<(), Error> {
let list_key = (Some(self.list_prefix), commit.as_ref());
match self.get_list(batch, commit)? {
None => {
let list = ListWrapper::Single { pos: new_pos };
batch.db.put_ser(&self.list_key(commit), &list)?;
batch.db.put_ser(list_key.0, list_key.1, &list)?;
}
Some(ListWrapper::Single { pos: current_pos }) => {
if new_pos.pos() <= current_pos.pos() {
@@ -288,13 +291,11 @@ where
head: new_pos.pos(),
tail: current_pos.pos(),
};
batch
.db
.put_ser(&self.entry_key(commit, new_pos.pos()), &head)?;
batch
.db
.put_ser(&self.entry_key(commit, current_pos.pos()), &tail)?;
batch.db.put_ser(&self.list_key(commit), &list)?;
let (new_pos_db_key, new_pos_key) = self.entry_key(commit, new_pos.pos());
batch.db.put_ser(new_pos_db_key, &new_pos_key, &head)?;
let (cur_pos_db_key, cur_pos_key) = self.entry_key(commit, current_pos.pos());
batch.db.put_ser(cur_pos_db_key, &cur_pos_key, &tail)?;
batch.db.put_ser(list_key.0, list_key.1, &list)?;
}
Some(ListWrapper::Multi { head, tail }) => {
if new_pos.pos() <= head {
@@ -319,13 +320,11 @@ where
head: new_pos.pos(),
tail,
};
batch
.db
.put_ser(&self.entry_key(commit, new_pos.pos()), &head)?;
batch
.db
.put_ser(&self.entry_key(commit, current_pos.pos()), &middle)?;
batch.db.put_ser(&self.list_key(commit), &list)?;
let (new_pos_db_key, new_pos_key) = self.entry_key(commit, new_pos.pos());
batch.db.put_ser(new_pos_db_key, &new_pos_key, &head)?;
let (cur_pos_db_key, cur_pos_key) = self.entry_key(commit, current_pos.pos());
batch.db.put_ser(cur_pos_db_key, &cur_pos_key, &middle)?;
batch.db.put_ser(list_key.0, list_key.1, &list)?;
} else {
return Err(Error::OtherErr("expected head to be head variant".into()));
}
@@ -338,10 +337,11 @@ where
/// Returns the output_pos.
/// Returns None if list was empty.
fn pop_pos(&self, batch: &mut Batch<'_>, commit: Commitment) -> Result<Option<T>, Error> {
let list_key = (Some(self.list_prefix), commit.as_ref());
match self.get_list(batch, commit)? {
None => Ok(None),
Some(ListWrapper::Single { pos }) => {
batch.delete(&self.list_key(commit))?;
batch.delete(list_key.0, list_key.1)?;
Ok(Some(pos))
}
Some(ListWrapper::Multi { head, tail }) => {
@@ -357,17 +357,20 @@ where
head: pos.pos(),
tail,
};
batch.delete(&self.entry_key(commit, current_pos.pos()))?;
batch
.db
.put_ser(&self.entry_key(commit, pos.pos()), &head)?;
batch.db.put_ser(&self.list_key(commit), &list)?;
let (cur_pos_db_key, cur_pos_key) =
self.entry_key(commit, current_pos.pos());
batch.delete(cur_pos_db_key, &cur_pos_key)?;
let (pos_db_key, pos_key) = self.entry_key(commit, pos.pos());
batch.db.put_ser(pos_db_key, &pos_key, &head)?;
batch.db.put_ser(list_key.0, list_key.1, &list)?;
Ok(Some(current_pos))
}
Some(ListEntry::Tail { pos, .. }) => {
let list = ListWrapper::Single { pos };
batch.delete(&self.entry_key(commit, current_pos.pos()))?;
batch.db.put_ser(&self.list_key(commit), &list)?;
let (cur_pos_db_key, cur_pos_key) =
self.entry_key(commit, current_pos.pos());
batch.delete(cur_pos_db_key, &cur_pos_key)?;
batch.db.put_ser(list_key.0, list_key.1, &list)?;
Ok(Some(current_pos))
}
Some(_) => Err(Error::OtherErr("next was unexpected".into())),
@@ -404,14 +407,14 @@ impl<T: PosEntry> PruneableListIndex for MultiIndex<T> {
fn clear(&self, batch: &mut Batch<'_>) -> Result<(), Error> {
let mut list_count = 0;
let mut entry_count = 0;
let prefix = to_key(self.list_prefix, "");
for key in batch.db.iter(&prefix, |k, _| Ok(k.to_vec()))? {
let _ = batch.delete(&key);
let list_db_key = Some(self.list_prefix);
for key in batch.db.iter(list_db_key, |k, _| Ok(k.to_vec()))? {
let _ = batch.delete(list_db_key, &key);
list_count += 1;
}
let prefix = to_key(self.entry_prefix, "");
for key in batch.db.iter(&prefix, |k, _| Ok(k.to_vec()))? {
let _ = batch.delete(&key);
let entry_db_key = Some(self.entry_prefix);
for key in batch.db.iter(entry_db_key, |k, _| Ok(k.to_vec()))? {
let _ = batch.delete(entry_db_key, &key);
entry_count += 1;
}
debug!(
@@ -436,10 +439,11 @@ impl<T: PosEntry> PruneableListIndex for MultiIndex<T> {
/// Pop off the back/tail of the linked list.
/// Used when pruning old data.
fn pop_pos_back(&self, batch: &mut Batch<'_>, commit: Commitment) -> Result<Option<T>, Error> {
let list_key = (Some(self.list_prefix), commit.as_ref());
match self.get_list(batch, commit)? {
None => Ok(None),
Some(ListWrapper::Single { pos }) => {
batch.delete(&self.list_key(commit))?;
batch.delete(list_key.0, list_key.1)?;
Ok(Some(pos))
}
Some(ListWrapper::Multi { head, tail }) => {
@@ -455,17 +459,19 @@ impl<T: PosEntry> PruneableListIndex for MultiIndex<T> {
head,
tail: pos.pos(),
};
batch.delete(&self.entry_key(commit, current_pos.pos()))?;
batch
.db
.put_ser(&self.entry_key(commit, pos.pos()), &tail)?;
batch.db.put_ser(&self.list_key(commit), &list)?;
let (cur_pos_db_key, cur_pos_key) =
self.entry_key(commit, current_pos.pos());
batch.delete(cur_pos_db_key, &cur_pos_key)?;
let (pos_db_key, pos_key) = self.entry_key(commit, pos.pos());
batch.db.put_ser(pos_db_key, &pos_key, &tail)?;
batch.db.put_ser(list_key.0, list_key.1, &list)?;
Ok(Some(current_pos))
}
Some(ListEntry::Head { pos, .. }) => {
let list = ListWrapper::Single { pos };
batch.delete(&self.entry_key(commit, current_pos.pos()))?;
batch.db.put_ser(&self.list_key(commit), &list)?;
let (pos_db_key, pos_key) = self.entry_key(commit, current_pos.pos());
batch.delete(pos_db_key, &pos_key)?;
batch.db.put_ser(list_key.0, list_key.1, &list)?;
Ok(Some(current_pos))
}
Some(_) => Err(Error::OtherErr("prev was unexpected".into())),
+87 -56
View File
@@ -26,7 +26,7 @@ use crate::util::secp::pedersen::Commitment;
use croaring::Bitmap;
use grin_core::ser;
use grin_store as store;
use grin_store::{option_to_not_found, to_key, Error};
use grin_store::{option_to_not_found, Error};
use std::convert::TryInto;
use std::sync::Arc;
@@ -38,7 +38,8 @@ const HEAD_PREFIX: u8 = b'H';
const TAIL_PREFIX: u8 = b'T';
const PIBD_HEAD_PREFIX: u8 = b'I';
const HEADER_HEAD_PREFIX: u8 = b'G';
const OUTPUT_POS_PREFIX: u8 = b'p';
/// Prefix for output pos index.
pub const OUTPUT_POS_PREFIX: u8 = b'p';
/// Prefix for NRD kernel pos index lists.
pub const NRD_KERNEL_LIST_PREFIX: u8 = b'K';
@@ -48,6 +49,17 @@ pub const NRD_KERNEL_ENTRY_PREFIX: u8 = b'k';
const BLOCK_SUMS_PREFIX: u8 = b'M';
const BLOCK_SPENT_PREFIX: u8 = b'S';
/// All database prefixes.
const DB_PREFIXES: [u8; 7] = [
BLOCK_HEADER_PREFIX,
BLOCK_PREFIX,
OUTPUT_POS_PREFIX,
NRD_KERNEL_LIST_PREFIX,
NRD_KERNEL_ENTRY_PREFIX,
BLOCK_SUMS_PREFIX,
BLOCK_SPENT_PREFIX,
];
/// All chain-related database operations
pub struct ChainStore {
db: store::Store,
@@ -56,30 +68,40 @@ pub struct ChainStore {
impl ChainStore {
/// Create new chain store
pub fn new(db_root: &str) -> Result<ChainStore, Error> {
let db = store::Store::new(db_root, None, Some(STORE_SUBPATH), None)?;
let db = store::Store::new(
db_root,
None,
Some(STORE_SUBPATH),
DB_PREFIXES.to_vec(),
None,
)?;
Ok(ChainStore { db })
}
/// The current chain head.
pub fn head(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&[HEAD_PREFIX], None), || "HEAD".to_owned())
option_to_not_found(self.db.get_ser(None, &[HEAD_PREFIX], None), || {
"HEAD".to_owned()
})
}
/// The current header head (may differ from chain head).
pub fn header_head(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&[HEADER_HEAD_PREFIX], None), || {
option_to_not_found(self.db.get_ser(None, &[HEADER_HEAD_PREFIX], None), || {
"HEADER_HEAD".to_owned()
})
}
/// The current chain "tail" (earliest block in the store).
pub fn tail(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&[TAIL_PREFIX], None), || "TAIL".to_owned())
option_to_not_found(self.db.get_ser(None, &[TAIL_PREFIX], None), || {
"TAIL".to_owned()
})
}
/// The current PIBD head (will differ from the other heads. Return genesis block if PIBD head doesn't exist).
pub fn pibd_head(&self) -> Result<Tip, Error> {
let res = option_to_not_found(self.db.get_ser(&[PIBD_HEAD_PREFIX], None), || {
let res = option_to_not_found(self.db.get_ser(None, &[PIBD_HEAD_PREFIX], None), || {
"PIBD_HEAD".to_owned()
});
@@ -96,21 +118,23 @@ impl ChainStore {
/// Get full block.
pub fn get_block(&self, h: &Hash) -> Result<Block, Error> {
option_to_not_found(self.db.get_ser(&to_key(BLOCK_PREFIX, h), None), || {
format!("BLOCK: {}", h)
})
option_to_not_found(
self.db.get_ser(Some(BLOCK_PREFIX), h.as_ref(), None),
|| format!("BLOCK: {}", h),
)
}
/// Does this full block exist?
pub fn block_exists(&self, h: &Hash) -> Result<bool, Error> {
self.db.exists(&to_key(BLOCK_PREFIX, h))
self.db.exists(Some(BLOCK_PREFIX), h.as_ref())
}
/// Get block_sums for the block hash.
pub fn get_block_sums(&self, h: &Hash) -> Result<BlockSums, Error> {
option_to_not_found(self.db.get_ser(&to_key(BLOCK_SUMS_PREFIX, h), None), || {
format!("Block sums for block: {}", h)
})
option_to_not_found(
self.db.get_ser(Some(BLOCK_SUMS_PREFIX), h.as_ref(), None),
|| format!("Block sums for block: {}", h),
)
}
/// Get previous header.
@@ -129,7 +153,7 @@ impl ChainStore {
/// Get block header.
pub fn get_block_header(&self, h: &Hash) -> Result<BlockHeader, Error> {
option_to_not_found(
self.db.get_ser(&to_key(BLOCK_HEADER_PREFIX, h), None),
self.db.get_ser(Some(BLOCK_HEADER_PREFIX), h.as_ref(), None),
|| format!("BLOCK HEADER: {}", h),
)
}
@@ -139,8 +163,9 @@ impl ChainStore {
pub fn get_block_header_skip_proof(&self, h: &Hash) -> Result<BlockHeader, Error> {
option_to_not_found(
self.db.get_ser(
&to_key(BLOCK_HEADER_PREFIX, h),
Some(ser::DeserializationMode::SkipPow),
Some(BLOCK_HEADER_PREFIX),
h.as_ref(),
Some(DeserializationMode::SkipPow),
),
|| format!("BLOCK HEADER: {}", h),
)
@@ -159,7 +184,8 @@ impl ChainStore {
/// Get PMMR pos and block height for the given output commitment.
pub fn get_output_pos_height(&self, commit: &Commitment) -> Result<Option<CommitPos>, Error> {
self.db.get_ser(&to_key(OUTPUT_POS_PREFIX, commit), None)
self.db
.get_ser(Some(OUTPUT_POS_PREFIX), commit.as_ref(), None)
}
/// Builds a new batch to be used with this store.
@@ -180,17 +206,21 @@ pub struct Batch<'a> {
impl<'a> Batch<'a> {
/// The head.
pub fn head(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&[HEAD_PREFIX], None), || "HEAD".to_owned())
option_to_not_found(self.db.get_ser(None, &[HEAD_PREFIX], None), || {
"HEAD".to_owned()
})
}
/// The tail.
pub fn tail(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&[TAIL_PREFIX], None), || "TAIL".to_owned())
option_to_not_found(self.db.get_ser(None, &[TAIL_PREFIX], None), || {
"TAIL".to_owned()
})
}
/// The current header head (may differ from chain head).
pub fn header_head(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&[HEADER_HEAD_PREFIX], None), || {
option_to_not_found(self.db.get_ser(None, &[HEADER_HEAD_PREFIX], None), || {
"HEADER_HEAD".to_owned()
})
}
@@ -202,34 +232,35 @@ impl<'a> Batch<'a> {
/// Save body head to db.
pub fn save_body_head(&mut self, t: &Tip) -> Result<(), Error> {
self.db.put_ser(&[HEAD_PREFIX], t)
self.db.put_ser(None, &[HEAD_PREFIX], t)
}
/// Save body "tail" to db.
pub fn save_body_tail(&mut self, t: &Tip) -> Result<(), Error> {
self.db.put_ser(&[TAIL_PREFIX], t)
self.db.put_ser(None, &[TAIL_PREFIX], t)
}
/// Save header head to db.
pub fn save_header_head(&mut self, t: &Tip) -> Result<(), Error> {
self.db.put_ser(&[HEADER_HEAD_PREFIX], t)
self.db.put_ser(None, &[HEADER_HEAD_PREFIX], t)
}
/// Save PIBD head to db.
pub fn save_pibd_head(&mut self, t: &Tip) -> Result<(), Error> {
self.db.put_ser(&[PIBD_HEAD_PREFIX], t)
self.db.put_ser(None, &[PIBD_HEAD_PREFIX], t)
}
/// get block
pub fn get_block(&self, h: &Hash) -> Result<Block, Error> {
option_to_not_found(self.db.get_ser(&to_key(BLOCK_PREFIX, h), None), || {
format!("Block with hash: {}", h)
})
option_to_not_found(
self.db.get_ser(Some(BLOCK_PREFIX), h.as_ref(), None),
|| format!("Block with hash: {}", h),
)
}
/// Does the block exist?
pub fn block_exists(&self, h: &Hash) -> Result<bool, Error> {
self.db.exists(&to_key(BLOCK_PREFIX, h))
self.db.exists(Some(BLOCK_PREFIX), h.as_ref())
}
/// Save the block to the db.
@@ -242,7 +273,7 @@ impl<'a> Batch<'a> {
b.inputs().version_str(),
self.db.protocol_version(),
);
self.db.put_ser(&to_key(BLOCK_PREFIX, b.hash())[..], b)?;
self.db.put_ser(Some(BLOCK_PREFIX), b.hash().as_ref(), b)?;
Ok(())
}
@@ -250,19 +281,19 @@ impl<'a> Batch<'a> {
/// to be easily reverted during rewind.
pub fn save_spent_index(&mut self, h: &Hash, spent: &[CommitPos]) -> Result<(), Error> {
self.db
.put_ser(&to_key(BLOCK_SPENT_PREFIX, h)[..], &spent.to_vec())?;
.put_ser(Some(BLOCK_SPENT_PREFIX), h.as_ref(), &spent.to_vec())?;
Ok(())
}
/// Low level function to delete directly by raw key.
pub fn delete(&mut self, key: &[u8]) -> Result<(), Error> {
self.db.delete(key)
pub fn delete(&mut self, db_key: Option<u8>, key: &[u8]) -> Result<(), Error> {
self.db.delete(db_key, key)
}
/// Delete a full block. Does not delete any record associated with a block
/// header.
pub fn delete_block(&mut self, bh: &Hash) -> Result<(), Error> {
self.db.delete(&to_key(BLOCK_PREFIX, bh)[..])?;
self.db.delete(Some(BLOCK_PREFIX), bh.as_ref())?;
// Best effort at deleting associated data for this block.
// Not an error if these fail.
@@ -280,7 +311,7 @@ impl<'a> Batch<'a> {
// Store the header itself indexed by hash.
self.db
.put_ser(&to_key(BLOCK_HEADER_PREFIX, hash)[..], header)?;
.put_ser(Some(BLOCK_HEADER_PREFIX), hash.as_ref(), header)?;
Ok(())
}
@@ -292,27 +323,25 @@ impl<'a> Batch<'a> {
pos: CommitPos,
) -> Result<(), Error> {
self.db
.put_ser(&to_key(OUTPUT_POS_PREFIX, commit)[..], &pos)
.put_ser(Some(OUTPUT_POS_PREFIX), commit.as_ref(), &pos)
}
/// Delete the output_pos index entry for a spent output.
pub fn delete_output_pos_height(&mut self, commit: &Commitment) -> Result<(), Error> {
self.db.delete(&to_key(OUTPUT_POS_PREFIX, commit))
self.db.delete(Some(OUTPUT_POS_PREFIX), commit.as_ref())
}
/// When using the output_pos iterator we have access to the index keys but not the
/// original commitment that the key is constructed from. So we need a way of comparing
/// a key with another commitment without reconstructing the commitment from the key bytes.
pub fn is_match_output_pos_key(&self, key: &[u8], commit: &Commitment) -> bool {
let commit_key = to_key(OUTPUT_POS_PREFIX, commit);
commit_key == key
commit.as_ref() == key
}
/// Iterator over the output_pos index.
pub fn output_pos_iter(&self) -> Result<impl Iterator<Item = (Vec<u8>, CommitPos)>, Error> {
let key = to_key(OUTPUT_POS_PREFIX, "");
let protocol_version = self.db.protocol_version();
self.db.iter(&key, move |k, mut v| {
self.db.iter(Some(OUTPUT_POS_PREFIX), move |k, mut v| {
ser::deserialize(&mut v, protocol_version, DeserializationMode::default())
.map(|pos| (k.to_vec(), pos))
.map_err(From::from)
@@ -332,7 +361,8 @@ impl<'a> Batch<'a> {
/// Get output_pos and block height from index.
pub fn get_output_pos_height(&self, commit: &Commitment) -> Result<Option<CommitPos>, Error> {
self.db.get_ser(&to_key(OUTPUT_POS_PREFIX, commit), None)
self.db
.get_ser(Some(OUTPUT_POS_PREFIX), commit.as_ref(), None)
}
/// Get the previous header.
@@ -352,7 +382,7 @@ impl<'a> Batch<'a> {
/// Get block header.
pub fn get_block_header(&self, h: &Hash) -> Result<BlockHeader, Error> {
option_to_not_found(
self.db.get_ser(&to_key(BLOCK_HEADER_PREFIX, h), None),
self.db.get_ser(Some(BLOCK_HEADER_PREFIX), h.as_ref(), None),
|| format!("BLOCK HEADER: {}", h),
)
}
@@ -362,8 +392,9 @@ impl<'a> Batch<'a> {
pub fn get_block_header_skip_proof(&self, h: &Hash) -> Result<BlockHeader, Error> {
option_to_not_found(
self.db.get_ser(
&to_key(BLOCK_HEADER_PREFIX, h),
Some(ser::DeserializationMode::SkipPow),
Some(BLOCK_HEADER_PREFIX),
h.as_ref(),
Some(DeserializationMode::SkipPow),
),
|| format!("BLOCK HEADER: {}", h),
)
@@ -371,24 +402,25 @@ impl<'a> Batch<'a> {
/// Delete the block spent index.
fn delete_spent_index(&mut self, bh: &Hash) -> Result<(), Error> {
self.db.delete(&to_key(BLOCK_SPENT_PREFIX, bh))
self.db.delete(Some(BLOCK_SPENT_PREFIX), bh.as_ref())
}
/// Save block_sums for the block.
pub fn save_block_sums(&mut self, h: &Hash, sums: BlockSums) -> Result<(), Error> {
self.db.put_ser(&to_key(BLOCK_SUMS_PREFIX, h)[..], &sums)
self.db.put_ser(Some(BLOCK_SUMS_PREFIX), h.as_ref(), &sums)
}
/// Get block_sums for the block.
pub fn get_block_sums(&self, h: &Hash) -> Result<BlockSums, Error> {
option_to_not_found(self.db.get_ser(&to_key(BLOCK_SUMS_PREFIX, h), None), || {
format!("Block sums for block: {}", h)
})
option_to_not_found(
self.db.get_ser(Some(BLOCK_SUMS_PREFIX), h.as_ref(), None),
|| format!("Block sums for block: {}", h),
)
}
/// Delete the block_sums for the block.
fn delete_block_sums(&mut self, bh: &Hash) -> Result<(), Error> {
self.db.delete(&to_key(BLOCK_SUMS_PREFIX, bh))
self.db.delete(Some(BLOCK_SUMS_PREFIX), bh.as_ref())
}
/// Get the block input bitmap based on our spent index.
@@ -406,7 +438,7 @@ impl<'a> Batch<'a> {
/// If we need to rewind a block then we use this to "unspend" the spent outputs.
pub fn get_spent_index(&self, bh: &Hash) -> Result<Vec<CommitPos>, Error> {
option_to_not_found(
self.db.get_ser(&to_key(BLOCK_SPENT_PREFIX, bh), None),
self.db.get_ser(Some(BLOCK_SPENT_PREFIX), bh.as_ref(), None),
|| format!("spent index: {}", bh),
)
}
@@ -428,9 +460,8 @@ impl<'a> Batch<'a> {
/// Iterator over all full blocks in the db.
/// Uses default db serialization strategy via db protocol version.
pub fn blocks_iter(&self) -> Result<impl Iterator<Item = Block> + 'a, Error> {
let key = to_key(BLOCK_PREFIX, "");
let protocol_version = self.db.protocol_version();
self.db.iter(&key, move |_, mut v| {
self.db.iter(Some(BLOCK_PREFIX), move |_, mut v| {
ser::deserialize(&mut v, protocol_version, DeserializationMode::default())
.map_err(From::from)
})
@@ -439,8 +470,8 @@ impl<'a> Batch<'a> {
/// Iterator over raw data for full blocks in the db.
/// Used during block migration (we need flexibility around deserialization).
pub fn blocks_raw_iter(&self) -> Result<impl Iterator<Item = (Vec<u8>, Vec<u8>)>, Error> {
let key = to_key(BLOCK_PREFIX, "");
self.db.iter(&key, |k, v| Ok((k.to_vec(), v.to_vec())))
self.db
.iter(Some(BLOCK_PREFIX), |k, v| Ok((k.to_vec(), v.to_vec())))
}
/// Protocol version of our underlying db.
+1 -1
View File
@@ -657,7 +657,7 @@ impl TxHashSet {
}
}
}
batch.delete(&key)?;
batch.delete(Some(store::OUTPUT_POS_PREFIX), &key)?;
removed_count += 1;
}
debug!(
+25 -19
View File
@@ -20,7 +20,7 @@ use rand::prelude::*;
use crate::core::ser::{self, DeserializationMode, Readable, Reader, Writeable, Writer};
use crate::types::{Capabilities, PeerAddr, ReasonForBan};
use grin_store::{self, option_to_not_found, to_key, Error};
use grin_store::{self, option_to_not_found, Error};
const DB_NAME: &str = "peer";
const STORE_SUBPATH: &str = "peers";
@@ -118,7 +118,13 @@ pub struct PeerStore {
impl PeerStore {
/// Instantiates a new peer store under the provided root path.
pub fn new(db_root: &str) -> Result<PeerStore, Error> {
let db = grin_store::Store::new(db_root, Some(DB_NAME), Some(STORE_SUBPATH), None)?;
let db = grin_store::Store::new(
db_root,
Some(DB_NAME),
Some(STORE_SUBPATH),
vec![PEER_PREFIX],
None,
)?;
Ok(PeerStore { db })
}
@@ -126,7 +132,8 @@ impl PeerStore {
debug!("save_peer: {:?} marked {:?}", p.addr, p.flags);
let mut batch = self.db.batch()?;
batch.put_ser(&peer_key(p.addr)[..], p)?;
let key = p.addr.as_key();
batch.put_ser(Some(PEER_PREFIX), key.as_bytes(), p)?;
batch.commit()
}
@@ -134,19 +141,23 @@ impl PeerStore {
let mut batch = self.db.batch()?;
for pd in p {
debug!("save_peers: {:?} marked {:?}", pd.addr, pd.flags);
batch.put_ser(&peer_key(pd.addr)[..], &pd)?;
let key = pd.addr.as_key();
batch.put_ser(Some(PEER_PREFIX), key.as_bytes(), &pd)?;
}
batch.commit()
}
pub fn get_peer(&self, peer_addr: PeerAddr) -> Result<PeerData, Error> {
option_to_not_found(self.db.get_ser(&peer_key(peer_addr)[..], None), || {
format!("Peer at address: {}", peer_addr)
})
let key = peer_addr.as_key();
option_to_not_found(
self.db.get_ser(Some(PEER_PREFIX), key.as_bytes(), None),
|| format!("Peer at address: {}", peer_addr),
)
}
pub fn exists_peer(&self, peer_addr: PeerAddr) -> Result<bool, Error> {
self.db.exists(&peer_key(peer_addr)[..])
let key = peer_addr.as_key();
self.db.exists(Some(PEER_PREFIX), key.as_bytes())
}
/// Convenience method to load a peer data, update its status and save it
@@ -154,9 +165,9 @@ impl PeerStore {
/// If new state is Defunct last connection attempt will be updated too.
pub fn update_state(&self, peer_addr: PeerAddr, new_state: State) -> Result<(), Error> {
let mut batch = self.db.batch()?;
let key = peer_addr.as_key();
let mut peer = option_to_not_found(
batch.get_ser::<PeerData>(&peer_key(peer_addr)[..], None),
batch.get_ser::<PeerData>(Some(PEER_PREFIX), key.as_bytes(), None),
|| format!("Peer at address: {}", peer_addr),
)?;
peer.flags = new_state;
@@ -166,7 +177,7 @@ impl PeerStore {
peer.last_attempt = Utc::now().timestamp();
}
batch.put_ser(&peer_key(peer_addr)[..], &peer)?;
batch.put_ser(Some(PEER_PREFIX), key.as_bytes(), &peer)?;
batch.commit()
}
@@ -185,9 +196,8 @@ pub struct PeersIterBatch<'a> {
impl<'a> PeersIterBatch<'a> {
/// Iterator over all known peers.
pub fn peers_iter(&self) -> Result<impl Iterator<Item = PeerData>, Error> {
let key = to_key(PEER_PREFIX, "");
let protocol_version = self.db.protocol_version();
self.db.iter(&key, move |_, mut v| {
self.db.iter(Some(PEER_PREFIX), move |_, mut v| {
ser::deserialize(&mut v, protocol_version, DeserializationMode::default())
.map_err(From::from)
})
@@ -230,7 +240,8 @@ impl<'a> PeersIterBatch<'a> {
// Delete peers in single batch
if !to_remove.is_empty() {
for peer in to_remove {
self.db.delete(&peer_key(peer.addr)[..])?;
let key = peer.addr.as_key();
self.db.delete(Some(PEER_PREFIX), key.as_bytes())?;
}
self.db.commit()?;
}
@@ -238,8 +249,3 @@ impl<'a> PeersIterBatch<'a> {
Ok(())
}
}
// Ignore the port unless ip is loopback address.
fn peer_key(peer_addr: PeerAddr) -> Vec<u8> {
to_key(PEER_PREFIX, &peer_addr.as_key())
}
+2 -41
View File
@@ -26,62 +26,23 @@ extern crate log;
extern crate grin_core;
extern crate grin_util as util;
//use grin_core as core;
pub mod leaf_set;
pub mod lmdb;
pub mod pmmr;
pub mod prune_list;
pub mod types;
const SEP: u8 = b':';
use byteorder::{BigEndian, WriteBytesExt};
pub use crate::lmdb::*;
/// Build a db key from a prefix and a byte vector identifier.
pub fn to_key<K: AsRef<[u8]>>(prefix: u8, k: K) -> Vec<u8> {
let k = k.as_ref();
let mut res = Vec::with_capacity(k.len() + 2);
res.push(prefix);
res.push(SEP);
res.extend_from_slice(k);
res
}
/// Build a db key from a prefix and a byte vector identifier and numeric identifier
pub fn to_key_u64<K: AsRef<[u8]>>(prefix: u8, k: K, val: u64) -> Vec<u8> {
let k = k.as_ref();
let mut res = Vec::with_capacity(k.len() + 10);
res.push(prefix);
res.push(SEP);
res.extend_from_slice(k);
res.write_u64::<BigEndian>(val).unwrap();
res
}
/// Build a db key from a prefix and a numeric identifier.
pub fn u64_to_key(prefix: u8, val: u64) -> Vec<u8> {
let mut res = Vec::with_capacity(10);
res.push(prefix);
res.push(SEP);
res.write_u64::<BigEndian>(val).unwrap();
res
}
use std::ffi::OsStr;
use std::fs::{remove_file, rename, File};
use std::path::Path;
/// Creates temporary file with name created by adding `temp_suffix` to `path`.
/// Applies writer function to it and renames temporary file into original specified by `path`.
pub fn save_via_temp_file<F, P, E>(
path: P,
temp_suffix: E,
mut writer: F,
) -> Result<(), std::io::Error>
pub fn save_via_temp_file<F, P, E>(path: P, temp_suffix: E, mut writer: F) -> Result<(), io::Error>
where
F: FnMut(&mut File) -> Result<(), std::io::Error>,
F: FnMut(&mut File) -> Result<(), io::Error>,
P: AsRef<Path>,
E: AsRef<OsStr>,
{
+138 -81
View File
@@ -30,6 +30,7 @@ use crate::util::RwLock;
pub const ALLOC_CHUNK_SIZE_DEFAULT: usize = 134_217_728; //128 MB
/// And for test mode, to avoid too much disk allocation on windows
pub const ALLOC_CHUNK_SIZE_DEFAULT_TEST: usize = 1_048_576; //1 MB
/// Minimal percent of used space when resizing must be performed.
const RESIZE_PERCENT: f32 = 0.9;
/// Want to ensure that each resize gives us at least this %
/// of total space free
@@ -81,7 +82,12 @@ where
const DEFAULT_DB_VERSION: ProtocolVersion = ProtocolVersion(3);
/// Default environment.
const DEFAULT_ENV_NAME: &'static str = "lmdb";
/// Default multi-database environment without prefixes.
const DEFAULT_MULTI_DB_ENV_NAME: &'static str = "multi_lmdb";
/// Prefix key separator.
pub const PREFIX_KEY_SEPARATOR: u8 = b':';
/// Mapping of database path to environment.
static ENV_MAP: OnceLock<Arc<RwLock<HashMap<String, Env<WithoutTls>>>>> = OnceLock::new();
@@ -95,29 +101,28 @@ static ENV_RESIZING: OnceLock<Arc<RwLock<HashMap<String, bool>>>> = OnceLock::ne
pub struct Store {
env: Env<WithoutTls>,
env_path: String,
db: Arc<Database<Bytes, Bytes>>,
name: String,
pre_dbs: Arc<HashMap<u8, Database<Bytes, Bytes>>>,
def_db: Database<Bytes, Bytes>,
version: ProtocolVersion,
alloc_chunk_size: usize,
}
impl Store {
/// Create a new LMDB env under the provided directory.
/// By default creates an environment named "lmdb".
/// By default creates an environment named "multi_lmdb".
/// Be aware of transactional semantics in lmdb
/// (transactions are per environment, not per database).
/// db with non-default `env_name` will be migrated into default environment.
/// Data from non-default `env_name` and prefixes will be
/// migrated into default multi db env file if needed.
pub fn new(
root_path: &str,
env_name: Option<&str>,
db_name: Option<&str>,
prefixes: Vec<u8>,
max_readers: Option<u32>,
) -> Result<Store, Error> {
let db_name = db_name.unwrap_or_else(|| DEFAULT_ENV_NAME);
// Database path setup.
let full_path = Path::new(root_path)
.join(DEFAULT_ENV_NAME)
.join(DEFAULT_MULTI_DB_ENV_NAME)
.to_str()
.unwrap()
.to_string();
@@ -142,7 +147,7 @@ impl Store {
if !has_env {
let env = unsafe {
let mut options = EnvOpenOptions::new().read_txn_without_tls();
let mut env_options = options.map_size(alloc_chunk_size).max_dbs(8);
let mut env_options = options.map_size(alloc_chunk_size).max_dbs(24);
if let Some(max_readers) = max_readers {
env_options = env_options.max_readers(max_readers);
}
@@ -154,7 +159,7 @@ impl Store {
env.resize(new_size)?;
};
}
debug!("DB Mapsize for {} is {}", db_name, env.info().map_size);
debug!("DB Mapsize is {}", env.info().map_size);
let mut w_env_map = env_map.write();
w_env_map.insert(full_path.clone(), env);
}
@@ -163,26 +168,34 @@ impl Store {
let r_env_map = env_map.read();
let env = r_env_map.get(&full_path).unwrap();
let mut write = env.write_txn()?;
let db = env.create_database(&mut write, Some(db_name))?;
let def_name = db_name.unwrap_or(DEFAULT_ENV_NAME);
let def_db = env.create_database(&mut write, Some(def_name))?;
let mut dbs_map = HashMap::<u8, Database<Bytes, Bytes>>::new();
for p in prefixes {
let db = env.create_database(&mut write, Some(p.to_string().as_str()))?;
dbs_map.insert(p, db);
}
write.commit()?;
let s = Store {
env: env.clone(),
env_path: full_path.clone(),
db: Arc::new(db),
name: db_name.to_string(),
pre_dbs: Arc::new(dbs_map),
def_db,
version: DEFAULT_DB_VERSION,
alloc_chunk_size,
};
// Migrate to default environment if needed.
if let Some(env_name) = env_name {
if env_name != DEFAULT_ENV_NAME {
let migrate_from = Path::new(root_path).join(env_name);
if s.migrate_to_default_env(&migrate_from).is_ok() {
let _ = fs::remove_dir_all(&migrate_from);
} else {
error!("Migrating DB {} failed", env_name);
let env_name = env_name.unwrap_or(DEFAULT_ENV_NAME);
if env_name != DEFAULT_MULTI_DB_ENV_NAME {
let migrate_from = Path::new(root_path).join(env_name);
if migrate_from.exists() {
match s.migrate_to_default_env(db_name, &migrate_from) {
Ok(_) => {
let _ = fs::remove_dir_all(&migrate_from);
}
Err(e) => error!("DB {} migration error: {:?}", env_name, e),
}
}
}
@@ -191,20 +204,27 @@ impl Store {
}
/// Migrate db from provided path to store environment.
fn migrate_to_default_env(&self, from_path: &Path) -> Result<(), Error> {
if !from_path.exists() {
return Ok(());
};
debug!("Migrating DB {} to {}", self.name, DEFAULT_ENV_NAME);
fn migrate_to_default_env(
&self,
from_name: Option<&str>,
from_path: &Path,
) -> Result<(), Error> {
debug!("Migrating DB {:?}", from_path);
let from_env = unsafe {
let mut options = EnvOpenOptions::new().read_txn_without_tls();
let env_options = options.map_size(self.alloc_chunk_size).max_dbs(1);
let env_options = options.map_size(self.alloc_chunk_size).max_dbs(24);
env_options.open(from_path)?
};
let (resize, new_size) = needs_resize(&from_env, self.alloc_chunk_size);
if resize {
unsafe {
from_env.resize(new_size)?;
self.env.resize(new_size)?;
};
}
let db_from = {
let mut write = from_env.write_txn()?;
let db_name = self.name.as_str();
let db: Database<Bytes, Bytes> = from_env.create_database(&mut write, Some(db_name))?;
let db: Database<Bytes, Bytes> = from_env.create_database(&mut write, from_name)?;
write.commit()?;
db
};
@@ -212,41 +232,51 @@ impl Store {
let read_from = from_env.read_txn()?;
let mut count = 0;
for kv in db_from.iter(&read_from)? {
count += 1;
if let Ok((k, v)) = kv {
self.db.put(&mut write_to, &k, &v)?;
if k.contains(&PREFIX_KEY_SEPARATOR) {
let db_name = k.split_at(1).0;
let db = self.pre_dbs.get(&db_name[0]).unwrap();
let key = k.split_at(2).1;
db.put(&mut write_to, key, &v)?;
count += 1;
} else {
self.def_db.put(&mut write_to, k, &v)?;
count += 1;
}
}
}
write_to.commit()?;
debug!("Migrated {} records from DB {}", count, self.name);
debug!("Migrated {} records from DB {:?}", count, from_path);
Ok(())
}
/// Construct a new store using a specific protocol version.
/// Permits access to the db with legacy protocol versions for db migrations.
pub fn with_version(&self, version: ProtocolVersion) -> Store {
Store {
env: self.env.clone(),
env_path: self.env_path.clone(),
db: self.db.clone(),
name: self.name.clone(),
version,
alloc_chunk_size: self.alloc_chunk_size,
}
}
/// Protocol version for the store.
pub fn protocol_version(&self) -> ProtocolVersion {
self.version
}
/// Get database from provided key or return default.
fn get_db(&self, db_key: Option<u8>) -> &Database<Bytes, Bytes> {
match db_key {
Some(db) => self.pre_dbs.get(&db).unwrap(),
None => &self.def_db,
}
}
/// Gets a value from the db, provided its key.
/// Deserializes the retrieved data using the provided function.
fn get_with<F, T>(&self, key: &[u8], read: &RoTxn, deserialize: F) -> Result<Option<T>, Error>
fn get_with<F, T>(
&self,
db_key: Option<u8>,
key: &[u8],
read: &RoTxn,
deserialize: F,
) -> Result<Option<T>, Error>
where
F: Fn(&[u8], &[u8]) -> Result<T, Error>,
{
let res: Option<&[u8]> = self.db.get(read, key)?;
let db = self.get_db(db_key);
let res: Option<&[u8]> = db.get(read, key)?;
match res {
None => Ok(None),
Some(res) => deserialize(key, res).map(Some),
@@ -257,6 +287,7 @@ impl Store {
/// Note: Creates a new read transaction so will *not* see any uncommitted data.
pub fn get_ser<T: ser::Readable>(
&self,
db_key: Option<u8>,
key: &[u8],
deser_mode: Option<DeserializationMode>,
) -> Result<Option<T>, Error> {
@@ -266,30 +297,35 @@ impl Store {
};
self.wait_for_resize();
let read = self.env.read_txn()?;
self.get_with(key, &read, |_, mut data| {
self.get_with(db_key, key, &read, |_, mut data| {
ser::deserialize(&mut data, self.protocol_version(), d).map_err(From::from)
})
}
/// Whether the provided key exists.
pub fn exists(&self, key: &[u8]) -> Result<bool, Error> {
pub fn exists(&self, db_key: Option<u8>, key: &[u8]) -> Result<bool, Error> {
self.wait_for_resize();
let read = self.env.read_txn()?;
let res = self.db.get(&read, key)?;
let db = self.get_db(db_key);
let res = db.get(&read, key)?;
Ok(res.is_some())
}
/// Produces an iterator from the provided key prefix.
pub fn iter<F, T>(&self, prefix: &[u8], deserialize: F) -> Result<PrefixIterator<F, T>, Error>
/// Produces an iterator from the provided db name.
pub fn iter<F, T>(
&self,
db_key: Option<u8>,
deserialize: F,
) -> Result<DatabaseIterator<F, T>, Error>
where
F: Fn(&[u8], &[u8]) -> Result<T, Error>,
{
self.wait_for_resize();
let read = self.env.clone().static_read_txn()?;
Ok(PrefixIterator::new(
self.db.clone(),
let db = self.get_db(db_key);
Ok(DatabaseIterator::new(
Arc::new(db.clone()),
read,
prefix,
deserialize,
))
}
@@ -305,7 +341,7 @@ impl Store {
if !resizing {
break;
}
debug!("Wait on {}, resizing DB", self.name);
trace!("Wait on resizing DB {}", self.env_path);
thread::sleep(Duration::from_millis(100));
}
}
@@ -320,7 +356,7 @@ impl Store {
let mut w_res_map = res_map.write();
w_res_map.insert(self.env_path.clone(), true);
}
debug!("Start resizing {} DB", self.name);
debug!("Start resizing DB {}", self.env_path);
unsafe {
loop {
let batches_count =
@@ -330,7 +366,10 @@ impl Store {
if cur == &0 {
break;
}
debug!("Wait {} batches to complete", cur);
debug!(
"Wait {} batches to complete to resize DB {}",
cur, self.env_path
);
thread::sleep(Duration::from_millis(100));
}
self.env.resize(new_size)?;
@@ -339,7 +378,7 @@ impl Store {
let mut w_res_map = res_map.write();
w_res_map.insert(self.env_path.clone(), false);
}
debug!("End resizing {} DB", self.name);
debug!("End resizing DB {}", self.env_path);
}
Ok(())
}
@@ -385,15 +424,21 @@ impl<'a> Batch<'a> {
}
/// Writes a single key/value pair to the db.
pub fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), Error> {
self.store.db.put(&mut self.write, key, value)?;
pub fn put(&mut self, db_key: Option<u8>, key: &[u8], value: &[u8]) -> Result<(), Error> {
let db = self.store.get_db(db_key);
db.put(&mut self.write, key, value)?;
Ok(())
}
/// Writes a single key and its `Writeable` value to the db.
/// Encapsulates serialization using the (default) version configured on the store instance.
pub fn put_ser<W: ser::Writeable>(&mut self, key: &[u8], value: &W) -> Result<(), Error> {
self.put_ser_with_version(key, value, self.store.protocol_version())
pub fn put_ser<W: ser::Writeable>(
&mut self,
db_key: Option<u8>,
key: &[u8],
value: &W,
) -> Result<(), Error> {
self.put_ser_with_version(db_key, key, value, self.store.protocol_version())
}
/// Protocol version used by this batch.
@@ -405,46 +450,58 @@ impl<'a> Batch<'a> {
/// Encapsulates serialization using the specified protocol version.
pub fn put_ser_with_version<W: ser::Writeable>(
&mut self,
db_key: Option<u8>,
key: &[u8],
value: &W,
version: ProtocolVersion,
) -> Result<(), Error> {
let ser_value = ser::ser_vec(value, version);
match ser_value {
Ok(data) => self.put(key, &data),
Ok(data) => self.put(db_key, key, &data),
Err(err) => Err(err.into()),
}
}
/// Low-level access for retrieving data by key.
/// Takes a function for flexible deserialization.
fn get_with<F, T>(&self, key: &[u8], deserialize: F) -> Result<Option<T>, Error>
fn get_with<F, T>(
&self,
db_key: Option<u8>,
key: &[u8],
deserialize: F,
) -> Result<Option<T>, Error>
where
F: Fn(&[u8], &[u8]) -> Result<T, Error>,
{
let read = self.write.nested_read_txn()?;
self.store.get_with(key, &read, deserialize)
self.store.get_with(db_key, key, &read, deserialize)
}
/// Whether the provided key exists.
/// This is in the context of the current write transaction.
pub fn exists(&self, key: &[u8]) -> Result<bool, Error> {
pub fn exists(&self, db_key: Option<u8>, key: &[u8]) -> Result<bool, Error> {
let read = self.write.nested_read_txn()?;
let res = self.store.db.get(&read, key)?;
let db = self.store.get_db(db_key);
let res = db.get(&read, key)?;
Ok(res.is_some())
}
/// Produces an iterator from the provided key prefix.
pub fn iter<F, T>(&self, prefix: &[u8], deserialize: F) -> Result<PrefixIterator<F, T>, Error>
/// Produces an iterator from the provided db key.
pub fn iter<F, T>(
&self,
db_key: Option<u8>,
deserialize: F,
) -> Result<DatabaseIterator<F, T>, Error>
where
F: Fn(&[u8], &[u8]) -> Result<T, Error>,
{
self.store.iter(prefix, deserialize)
self.store.iter(db_key, deserialize)
}
/// Gets a `Readable` value from the db by provided key and provided deserialization strategy.
pub fn get_ser<T: ser::Readable>(
&self,
db_key: Option<u8>,
key: &[u8],
deser_mode: Option<DeserializationMode>,
) -> Result<Option<T>, Error> {
@@ -452,7 +509,7 @@ impl<'a> Batch<'a> {
Some(d) => d,
_ => DeserializationMode::default(),
};
self.get_with(key, |_, mut data| {
self.get_with(db_key, key, |_, mut data| {
match ser::deserialize(&mut data, self.protocol_version(), d) {
Ok(res) => Ok(res),
Err(e) => Err(From::from(e)),
@@ -461,8 +518,9 @@ impl<'a> Batch<'a> {
}
/// Deletes a key/value pair from the db.
pub fn delete(&mut self, key: &[u8]) -> Result<(), Error> {
self.store.db.delete(&mut self.write, key)?;
pub fn delete(&mut self, db_key: Option<u8>, key: &[u8]) -> Result<(), Error> {
let db = self.store.get_db(db_key);
db.delete(&mut self.write, key)?;
Ok(())
}
@@ -488,9 +546,9 @@ impl<'a> Batch<'a> {
}
}
/// An iterator based on key prefix.
/// An iterator based on db key.
/// Caller is responsible for deserialization of the data.
pub struct PrefixIterator<F, T>
pub struct DatabaseIterator<F, T>
where
F: Fn(&[u8], &[u8]) -> Result<T, Error>,
{
@@ -501,7 +559,7 @@ where
deserialize: F,
}
impl<F, T> Iterator for PrefixIterator<F, T>
impl<F, T> Iterator for DatabaseIterator<F, T>
where
F: Fn(&[u8], &[u8]) -> Result<T, Error>,
{
@@ -524,7 +582,7 @@ where
}
}
impl<F, T> PrefixIterator<F, T>
impl<F, T> DatabaseIterator<F, T>
where
F: Fn(&[u8], &[u8]) -> Result<T, Error>,
{
@@ -532,10 +590,9 @@ where
pub fn new(
db: Arc<Database<Bytes, Bytes>>,
read: RoTxn<'static, WithoutTls>,
prefix: &[u8],
deserialize: F,
) -> PrefixIterator<F, T> {
let keys = if let Ok(iter) = db.prefix_iter(&read, &prefix) {
) -> DatabaseIterator<F, T> {
let keys = if let Ok(iter) = db.iter(&read) {
iter.move_between_keys()
.filter(|kv| kv.is_ok())
.map(|kv| kv.unwrap().0.to_vec())
@@ -543,7 +600,7 @@ where
} else {
vec![]
};
PrefixIterator {
DatabaseIterator {
db,
read: Arc::new(read),
keys,
+16 -15
View File
@@ -70,25 +70,26 @@ fn test_exists() -> Result<(), store::Error> {
let test_dir = "target/test_exists";
setup(test_dir);
let store = store::Store::new(test_dir, Some("test1"), None, None)?;
let prefix = b'P';
let store = store::Store::new(test_dir, Some("test1"), None, vec![prefix], None)?;
let key = [0, 0, 0, 1];
let value = [1, 1, 1, 1];
// Start new batch and insert a new key/value entry.
let mut batch = store.batch()?;
batch.put(&key, &value)?;
batch.put(Some(prefix), &key, &value)?;
// Check we can see the new entry in uncommitted batch.
assert!(batch.exists(&key)?);
assert!(batch.exists(Some(prefix), &key)?);
// Check we cannot see the new entry yet outside of the uncommitted batch.
assert!(!store.exists(&key)?);
assert!(!store.exists(Some(prefix), &key)?);
batch.commit()?;
// Check we can see the new entry after committing the batch.
assert!(store.exists(&key)?);
assert!(store.exists(Some(prefix), &key)?);
clean_output_dir(test_dir);
Ok(())
@@ -99,14 +100,15 @@ fn test_iter() -> Result<(), store::Error> {
let test_dir = "target/test_iter";
setup(test_dir);
let store = store::Store::new(test_dir, Some("test1"), None, None)?;
let prefix = b'P';
let store = store::Store::new(test_dir, Some("test1"), None, vec![prefix], None)?;
let key = [0, 0, 0, 1];
let value = [1, 1, 1, 1];
// Start new batch and insert a new key/value entry.
let mut batch = store.batch()?;
batch.put(&key, &value)?;
batch.put(Some(prefix), &key, &value)?;
// TODO - This is not currently possible (and we need to be aware of this).
// Currently our SerIterator is limited to using a ReadTransaction only.
@@ -117,13 +119,13 @@ fn test_iter() -> Result<(), store::Error> {
// assert_eq!(iter.next(), None);
// Check we can not yet see the new entry via an iterator outside the uncommitted batch.
let mut iter = store.iter(&[0], |_, v| Ok(v.to_vec()))?;
let mut iter = store.iter(Some(prefix), |_, v| Ok(v.to_vec()))?;
assert_eq!(iter.next(), None);
batch.commit()?;
// Check we can see the new entry via an iterator after committing the batch.
let mut iter = store.iter(&[0], |_, v| Ok(v.to_vec()))?;
let mut iter = store.iter(Some(prefix), |_, v| Ok(v.to_vec()))?;
assert_eq!(iter.next(), Some(value.to_vec()));
assert_eq!(iter.next(), None);
@@ -135,18 +137,18 @@ fn test_iter() -> Result<(), store::Error> {
fn lmdb_allocate() -> Result<(), store::Error> {
let test_dir = "target/lmdb_allocate";
setup(test_dir);
let prefix = b'P';
// Allocate more than the initial chunk, ensuring
// the DB resizes underneath
{
let store = store::Store::new(test_dir, Some("test1"), None, None)?;
let store = store::Store::new(test_dir, Some("test1"), None, vec![prefix], None)?;
for i in 0..WRITE_CHUNK_SIZE * 2 {
println!("Allocating chunk: {}", i);
let chunk = PhatChunkStruct::new();
let key_val = format!("phat_chunk_set_1_{}", i);
let mut batch = store.batch()?;
let key = store::to_key(b'P', &key_val);
batch.put_ser(&key, &chunk)?;
batch.put_ser(Some(prefix), key_val.as_bytes(), &chunk)?;
batch.commit()?;
}
}
@@ -155,14 +157,13 @@ fn lmdb_allocate() -> Result<(), store::Error> {
println!("***********************************");
// Open env again and keep adding
{
let store = store::Store::new(test_dir, Some("test1"), None, None)?;
let store = store::Store::new(test_dir, Some("test1"), None, vec![prefix], None)?;
for i in 0..WRITE_CHUNK_SIZE * 2 {
println!("Allocating chunk: {}", i);
let chunk = PhatChunkStruct::new();
let key_val = format!("phat_chunk_set_2_{}", i);
let mut batch = store.batch()?;
let key = store::to_key(b'P', &key_val);
batch.put_ser(&key, &chunk)?;
batch.put_ser(Some(prefix), key_val.as_bytes(), &chunk)?;
batch.commit()?;
}
}