Compare commits
62 Commits
2c3a067629
...
bce5a7144b
| Author | SHA1 | Date | |
|---|---|---|---|
| bce5a7144b | |||
| b41a0cc209 | |||
| a25fde9674 | |||
| ee4390e437 | |||
| 68053a203e | |||
| 62e5ace442 | |||
| cf2ed3f3be | |||
| 386ac1ed5c | |||
| 110e0e143f | |||
| 0dc4280b68 | |||
| f3796fa27b | |||
| dc7b49004d | |||
| 22e34311b8 | |||
| a81f59926c | |||
| 20e4e845a1 | |||
| 2d0ce2d526 | |||
| c4f054ab51 | |||
| 048453f989 | |||
| 80cba965ab | |||
| 86eaa7b7ac | |||
| fde914d11a | |||
| 473f9f33ad | |||
| 0321bf5b14 | |||
| be37f46f59 | |||
| 3a37d24dca | |||
| 114c0eefa4 | |||
| fb87a26d3f | |||
| f5d59e595e | |||
| 5bebacd605 | |||
| 95486a4bc0 | |||
| e6265a810d | |||
| 5fb9c5badf | |||
| 0ea779e777 | |||
| bd63c266e2 | |||
| f80e450896 | |||
| 1832e1e907 | |||
| 2164f6098f | |||
| 0bedfb39d7 | |||
| 13cfef93ec | |||
| 36211eef59 | |||
| 03ddb5f118 | |||
| 29f822f298 | |||
| 3d14a54d69 | |||
| 769da6dbdd | |||
| dfac42618c | |||
| b41f0bdbd4 | |||
| ab907508c7 | |||
| 3d8cb52995 | |||
| f47bf935ab | |||
| 0c3e18a913 | |||
| 42b928a42f | |||
| cc555a22fb | |||
| b172256d0b | |||
| 1258b844f3 | |||
| 91f6ddc8c3 | |||
| 3787881673 | |||
| 9585c02135 | |||
| ab9715d9a9 | |||
| 464babc7ef | |||
| 4511fd89a2 | |||
| d9bca1c776 | |||
| 397b6cd116 |
@@ -2,7 +2,7 @@ name: Build and Push to GHCR
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [master, staging]
|
||||
branches: [grim]
|
||||
|
||||
env:
|
||||
GHCR_IMAGE: ghcr.io/${{ secrets.GHCR_USERNAME }}/${{ github.event.repository.name }}
|
||||
|
||||
Generated
+11
-11
@@ -957,7 +957,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "grin"
|
||||
version = "5.4.0"
|
||||
version = "5.4.1"
|
||||
dependencies = [
|
||||
"blake2-rfc",
|
||||
"built",
|
||||
@@ -987,7 +987,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "grin_api"
|
||||
version = "5.4.0"
|
||||
version = "5.4.1"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"bytes 1.7.1",
|
||||
@@ -1020,7 +1020,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "grin_chain"
|
||||
version = "5.4.0"
|
||||
version = "5.4.1"
|
||||
dependencies = [
|
||||
"bit-vec",
|
||||
"bitflags 1.3.2",
|
||||
@@ -1044,7 +1044,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "grin_config"
|
||||
version = "5.4.0"
|
||||
version = "5.4.1"
|
||||
dependencies = [
|
||||
"dirs",
|
||||
"grin_core",
|
||||
@@ -1060,7 +1060,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "grin_core"
|
||||
version = "5.4.0"
|
||||
version = "5.4.1"
|
||||
dependencies = [
|
||||
"blake2-rfc",
|
||||
"byteorder",
|
||||
@@ -1086,7 +1086,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "grin_keychain"
|
||||
version = "5.4.0"
|
||||
version = "5.4.1"
|
||||
dependencies = [
|
||||
"blake2-rfc",
|
||||
"byteorder",
|
||||
@@ -1107,7 +1107,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "grin_p2p"
|
||||
version = "5.4.0"
|
||||
version = "5.4.1"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"built",
|
||||
@@ -1130,7 +1130,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "grin_pool"
|
||||
version = "5.4.0"
|
||||
version = "5.4.1"
|
||||
dependencies = [
|
||||
"blake2-rfc",
|
||||
"chrono",
|
||||
@@ -1162,7 +1162,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "grin_servers"
|
||||
version = "5.4.0"
|
||||
version = "5.4.1"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"chrono",
|
||||
@@ -1192,7 +1192,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "grin_store"
|
||||
version = "5.4.0"
|
||||
version = "5.4.1"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
"chrono",
|
||||
@@ -1214,7 +1214,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "grin_util"
|
||||
version = "5.4.0"
|
||||
version = "5.4.1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"backtrace",
|
||||
|
||||
+12
-12
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "grin"
|
||||
version = "5.4.0"
|
||||
version = "5.4.1"
|
||||
authors = ["Grin Developers <mimblewimble@lists.launchpad.net>"]
|
||||
description = "Simple, private and scalable cryptocurrency implementation based on the Mimblewimble chain format."
|
||||
license = "Apache-2.0"
|
||||
@@ -34,15 +34,15 @@ serde_json = "1"
|
||||
log = "0.4"
|
||||
term = "0.6"
|
||||
|
||||
grin_api = { path = "./api", version = "5.4.0" }
|
||||
grin_config = { path = "./config", version = "5.4.0" }
|
||||
grin_chain = { path = "./chain", version = "5.4.0" }
|
||||
grin_core = { path = "./core", version = "5.4.0" }
|
||||
grin_keychain = { path = "./keychain", version = "5.4.0" }
|
||||
grin_p2p = { path = "./p2p", version = "5.4.0" }
|
||||
grin_servers = { path = "./servers", version = "5.4.0" }
|
||||
grin_util = { path = "./util", version = "5.4.0" }
|
||||
grin_store = { path = "./store", version = "5.4.0" }
|
||||
grin_api = { path = "./api", version = "5.4.1" }
|
||||
grin_config = { path = "./config", version = "5.4.1" }
|
||||
grin_chain = { path = "./chain", version = "5.4.1" }
|
||||
grin_core = { path = "./core", version = "5.4.1" }
|
||||
grin_keychain = { path = "./keychain", version = "5.4.1" }
|
||||
grin_p2p = { path = "./p2p", version = "5.4.1" }
|
||||
grin_servers = { path = "./servers", version = "5.4.1" }
|
||||
grin_util = { path = "./util", version = "5.4.1" }
|
||||
grin_store = { path = "./store", version = "5.4.1" }
|
||||
|
||||
[dependencies.cursive]
|
||||
version = "0.21"
|
||||
@@ -53,5 +53,5 @@ features = ["pancurses-backend"]
|
||||
built = { version = "0.8.0", features = ["git2"]}
|
||||
|
||||
[dev-dependencies]
|
||||
grin_chain = { path = "./chain", version = "5.4.0" }
|
||||
grin_store = { path = "./store", version = "5.4.0" }
|
||||
grin_chain = { path = "./chain", version = "5.4.1" }
|
||||
grin_store = { path = "./store", version = "5.4.1" }
|
||||
|
||||
+7
-7
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "grin_api"
|
||||
version = "5.4.0"
|
||||
version = "5.4.1"
|
||||
authors = ["Grin Developers <mimblewimble@lists.launchpad.net>"]
|
||||
description = "APIs for grin, a simple, private and scalable cryptocurrency implementation based on the Mimblewimble chain format."
|
||||
license = "Apache-2.0"
|
||||
@@ -32,9 +32,9 @@ async-stream = "0.3"
|
||||
url = "2.1"
|
||||
bytes = "1"
|
||||
|
||||
grin_core = { path = "../core", version = "5.4.0" }
|
||||
grin_chain = { path = "../chain", version = "5.4.0" }
|
||||
grin_p2p = { path = "../p2p", version = "5.4.0" }
|
||||
grin_pool = { path = "../pool", version = "5.4.0" }
|
||||
grin_store = { path = "../store", version = "5.4.0" }
|
||||
grin_util = { path = "../util", version = "5.4.0" }
|
||||
grin_core = { path = "../core", version = "5.4.1" }
|
||||
grin_chain = { path = "../chain", version = "5.4.1" }
|
||||
grin_p2p = { path = "../p2p", version = "5.4.1" }
|
||||
grin_pool = { path = "../pool", version = "5.4.1" }
|
||||
grin_store = { path = "../store", version = "5.4.1" }
|
||||
grin_util = { path = "../util", version = "5.4.1" }
|
||||
|
||||
+5
-5
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "grin_chain"
|
||||
version = "5.4.0"
|
||||
version = "5.4.1"
|
||||
authors = ["Grin Developers <mimblewimble@lists.launchpad.net>"]
|
||||
description = "Chain implementation for grin, a simple, private and scalable cryptocurrency implementation based on the Mimblewimble chain format."
|
||||
license = "Apache-2.0"
|
||||
@@ -23,10 +23,10 @@ chrono = "0.4.11"
|
||||
lru-cache = "0.1"
|
||||
lazy_static = "1"
|
||||
|
||||
grin_core = { path = "../core", version = "5.4.0" }
|
||||
grin_keychain = { path = "../keychain", version = "5.4.0" }
|
||||
grin_store = { path = "../store", version = "5.4.0" }
|
||||
grin_util = { path = "../util", version = "5.4.0" }
|
||||
grin_core = { path = "../core", version = "5.4.1" }
|
||||
grin_keychain = { path = "../keychain", version = "5.4.1" }
|
||||
grin_store = { path = "../store", version = "5.4.1" }
|
||||
grin_util = { path = "../util", version = "5.4.1" }
|
||||
|
||||
[dev-dependencies]
|
||||
env_logger = "0.7"
|
||||
|
||||
@@ -43,6 +43,9 @@ pub const SEGMENT_REQUEST_TIMEOUT_SECS: i64 = 20;
|
||||
/// will always be requested first)
|
||||
pub const SEGMENT_REQUEST_COUNT: usize = 15;
|
||||
|
||||
/// How many blocks behind the tip a PIBD peer may be and still be considered usable.
|
||||
pub const PIBD_PEER_HEIGHT_SLACK_BLOCKS: u64 = 2;
|
||||
|
||||
/// If the syncer hasn't seen a max work peer that supports PIBD in this number of seconds
|
||||
/// give up and revert back to the txhashset.zip download method
|
||||
pub const TXHASHSET_ZIP_FALLBACK_TIME_SECS: i64 = 60;
|
||||
|
||||
@@ -280,6 +280,102 @@ pub struct BitmapSegment {
|
||||
proof: SegmentProof,
|
||||
}
|
||||
|
||||
impl BitmapSegment {
|
||||
// Matches the upper end of the currently served PIBD bitmap segment range.
|
||||
const MAX_SEGMENT_HEIGHT: u8 = 13;
|
||||
|
||||
fn max_chunks(identifier: &SegmentIdentifier) -> Result<usize, ser::Error> {
|
||||
if identifier.height > Self::MAX_SEGMENT_HEIGHT {
|
||||
return Err(ser::Error::TooLargeReadErr);
|
||||
}
|
||||
1usize
|
||||
.checked_shl(identifier.height as u32)
|
||||
.ok_or(ser::Error::TooLargeReadErr)
|
||||
}
|
||||
|
||||
fn leaf_offset(identifier: &SegmentIdentifier) -> Result<u64, ser::Error> {
|
||||
let segment_capacity = 1u64
|
||||
.checked_shl(identifier.height as u32)
|
||||
.ok_or(ser::Error::TooLargeReadErr)?;
|
||||
segment_capacity
|
||||
.checked_mul(identifier.idx)
|
||||
.ok_or(ser::Error::TooLargeReadErr)
|
||||
}
|
||||
|
||||
fn n_chunks(blocks: &[BitmapBlock]) -> Result<usize, ser::Error> {
|
||||
let (last, full_blocks) = blocks.split_last().ok_or(ser::Error::CorruptedData)?;
|
||||
for block in full_blocks {
|
||||
if block.try_n_chunks()? != BitmapBlock::NCHUNKS {
|
||||
return Err(ser::Error::CorruptedData);
|
||||
}
|
||||
}
|
||||
let last_chunks = last.try_n_chunks()?;
|
||||
if last_chunks == 0 {
|
||||
return Err(ser::Error::CorruptedData);
|
||||
}
|
||||
full_blocks
|
||||
.len()
|
||||
.checked_mul(BitmapBlock::NCHUNKS)
|
||||
.and_then(|n| n.checked_add(last_chunks))
|
||||
.ok_or(ser::Error::TooLargeReadErr)
|
||||
}
|
||||
|
||||
fn validate_blocks(
|
||||
identifier: &SegmentIdentifier,
|
||||
blocks: &[BitmapBlock],
|
||||
) -> Result<usize, ser::Error> {
|
||||
let offset = Self::leaf_offset(identifier)?;
|
||||
let n_chunks = Self::n_chunks(blocks)?;
|
||||
if n_chunks > Self::max_chunks(identifier)? {
|
||||
return Err(ser::Error::TooLargeReadErr);
|
||||
}
|
||||
offset
|
||||
.checked_add((n_chunks - 1) as u64)
|
||||
.ok_or(ser::Error::TooLargeReadErr)?;
|
||||
Ok(n_chunks)
|
||||
}
|
||||
|
||||
/// Convert this bitmap segment into a PMMR segment, validating its encoded shape.
|
||||
pub fn into_segment(self) -> Result<Segment<BitmapChunk>, ser::Error> {
|
||||
let BitmapSegment {
|
||||
identifier,
|
||||
blocks,
|
||||
proof,
|
||||
} = self;
|
||||
|
||||
let n_chunks = Self::validate_blocks(&identifier, &blocks)?;
|
||||
let mut leaf_pos = Vec::with_capacity(n_chunks);
|
||||
let mut chunks = Vec::with_capacity(n_chunks);
|
||||
let offset = Self::leaf_offset(&identifier)?;
|
||||
for i in 0..(n_chunks as u64) {
|
||||
let insertion_idx = offset.checked_add(i).ok_or(ser::Error::TooLargeReadErr)?;
|
||||
leaf_pos.push(pmmr::insertion_to_pmmr_index(insertion_idx));
|
||||
chunks.push(BitmapChunk::new());
|
||||
}
|
||||
|
||||
for (block_idx, block) in blocks.into_iter().enumerate() {
|
||||
block.try_n_chunks()?;
|
||||
let offset = block_idx * BitmapBlock::NCHUNKS;
|
||||
for (i, _) in block.inner.iter().enumerate().filter(|&(_, v)| v) {
|
||||
chunks
|
||||
.get_mut(offset + i / BitmapChunk::LEN_BITS)
|
||||
.ok_or(ser::Error::CorruptedData)?
|
||||
.0
|
||||
.set(i % BitmapChunk::LEN_BITS, true);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Segment::from_parts(
|
||||
identifier,
|
||||
Vec::new(),
|
||||
Vec::new(),
|
||||
leaf_pos,
|
||||
chunks,
|
||||
proof,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl Writeable for BitmapSegment {
|
||||
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
|
||||
Writeable::write(&self.identifier, writer)?;
|
||||
@@ -297,10 +393,20 @@ impl Readable for BitmapSegment {
|
||||
let identifier: SegmentIdentifier = Readable::read(reader)?;
|
||||
|
||||
let n_blocks = reader.read_u16()? as usize;
|
||||
if n_blocks == 0 {
|
||||
return Err(ser::Error::CorruptedData);
|
||||
}
|
||||
let max_blocks = (BitmapSegment::max_chunks(&identifier)? + BitmapBlock::NCHUNKS - 1)
|
||||
/ BitmapBlock::NCHUNKS;
|
||||
if n_blocks > max_blocks {
|
||||
return Err(ser::Error::TooLargeReadErr);
|
||||
}
|
||||
BitmapSegment::leaf_offset(&identifier)?;
|
||||
let mut blocks = Vec::<BitmapBlock>::with_capacity(n_blocks);
|
||||
for _ in 0..n_blocks {
|
||||
blocks.push(Readable::read(reader)?);
|
||||
}
|
||||
BitmapSegment::validate_blocks(&identifier, &blocks)?;
|
||||
let proof = Readable::read(reader)?;
|
||||
|
||||
Ok(Self {
|
||||
@@ -348,36 +454,7 @@ impl From<Segment<BitmapChunk>> for BitmapSegment {
|
||||
// TODO: this can be sped up with some `unsafe` code
|
||||
impl From<BitmapSegment> for Segment<BitmapChunk> {
|
||||
fn from(segment: BitmapSegment) -> Self {
|
||||
let BitmapSegment {
|
||||
identifier,
|
||||
blocks,
|
||||
proof,
|
||||
} = segment;
|
||||
|
||||
// Count the number of chunks taking into account that the final block might be smaller
|
||||
let n_chunks = (blocks.len() - 1) * BitmapBlock::NCHUNKS
|
||||
+ blocks.last().map(|b| b.n_chunks()).unwrap_or(0);
|
||||
let mut leaf_pos = Vec::with_capacity(n_chunks);
|
||||
let mut chunks = Vec::with_capacity(n_chunks);
|
||||
let offset = (1 << identifier.height) * identifier.idx;
|
||||
for i in 0..(n_chunks as u64) {
|
||||
leaf_pos.push(pmmr::insertion_to_pmmr_index(offset + i));
|
||||
chunks.push(BitmapChunk::new());
|
||||
}
|
||||
|
||||
for (block_idx, block) in blocks.into_iter().enumerate() {
|
||||
assert!(block.inner.len() <= BitmapBlock::NBITS as usize);
|
||||
let offset = block_idx * BitmapBlock::NCHUNKS;
|
||||
for (i, _) in block.inner.iter().enumerate().filter(|&(_, v)| v) {
|
||||
chunks
|
||||
.get_mut(offset + i / BitmapChunk::LEN_BITS)
|
||||
.unwrap()
|
||||
.0
|
||||
.set(i % BitmapChunk::LEN_BITS, true);
|
||||
}
|
||||
}
|
||||
|
||||
Segment::from_parts(identifier, Vec::new(), Vec::new(), leaf_pos, chunks, proof)
|
||||
segment.into_segment().expect("valid bitmap segment")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -401,12 +478,16 @@ impl BitmapBlock {
|
||||
}
|
||||
}
|
||||
|
||||
fn n_chunks(&self) -> usize {
|
||||
fn try_n_chunks(&self) -> Result<usize, ser::Error> {
|
||||
let length = self.inner.len();
|
||||
assert_eq!(length % BitmapChunk::LEN_BITS, 0);
|
||||
if length % BitmapChunk::LEN_BITS != 0 {
|
||||
return Err(ser::Error::CorruptedData);
|
||||
}
|
||||
let n_chunks = length / BitmapChunk::LEN_BITS;
|
||||
assert!(n_chunks <= BitmapBlock::NCHUNKS);
|
||||
n_chunks
|
||||
if n_chunks > BitmapBlock::NCHUNKS {
|
||||
return Err(ser::Error::TooLargeReadErr);
|
||||
}
|
||||
Ok(n_chunks)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -497,102 +497,126 @@ impl Desegmenter {
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// We have all required bitmap segments and have recreated our local
|
||||
// bitmap, now continue with other segments, evenly spreading requests
|
||||
// among MMRs
|
||||
let local_output_mmr_size;
|
||||
let local_kernel_mmr_size;
|
||||
let local_rangeproof_mmr_size;
|
||||
{
|
||||
let txhashset = self.txhashset.read();
|
||||
local_output_mmr_size = txhashset.output_mmr_size();
|
||||
local_kernel_mmr_size = txhashset.kernel_mmr_size();
|
||||
local_rangeproof_mmr_size = txhashset.rangeproof_mmr_size();
|
||||
}
|
||||
let total_output_segments = SegmentIdentifier::count_segments_required(
|
||||
self.archive_header.output_mmr_size,
|
||||
self.default_output_segment_height,
|
||||
);
|
||||
let mut elems_added = 0;
|
||||
if let Some(mut next_output_idx) = self.next_required_output_segment_index() {
|
||||
while (next_output_idx as usize) < total_output_segments {
|
||||
if self.output_segment_cache.len() >= self.max_cached_segments {
|
||||
break;
|
||||
}
|
||||
if elems_added == max_elements / 3 {
|
||||
break;
|
||||
}
|
||||
let output_id = SegmentIdentifier {
|
||||
height: self.default_output_segment_height,
|
||||
idx: next_output_idx,
|
||||
};
|
||||
let (_first, last) =
|
||||
output_id.segment_pos_range(self.archive_header.output_mmr_size);
|
||||
if last > local_output_mmr_size && !self.has_output_segment_with_id(output_id) {
|
||||
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Output, output_id));
|
||||
elems_added += 1;
|
||||
}
|
||||
next_output_idx += 1;
|
||||
}
|
||||
}
|
||||
// Bitmap is not finished yet, continue at next iteration when it will be ready.
|
||||
return return_vec;
|
||||
}
|
||||
|
||||
let total_rangeproof_segments = SegmentIdentifier::count_segments_required(
|
||||
self.archive_header.output_mmr_size,
|
||||
self.default_rangeproof_segment_height,
|
||||
);
|
||||
elems_added = 0;
|
||||
if let Some(mut next_rp_idx) = self.next_required_rangeproof_segment_index() {
|
||||
while (next_rp_idx as usize) < total_rangeproof_segments {
|
||||
if self.rangeproof_segment_cache.len() >= self.max_cached_segments {
|
||||
break;
|
||||
}
|
||||
if elems_added == max_elements / 3 {
|
||||
break;
|
||||
}
|
||||
let rp_id = SegmentIdentifier {
|
||||
height: self.default_rangeproof_segment_height,
|
||||
idx: next_rp_idx,
|
||||
};
|
||||
let (_first, last) =
|
||||
rp_id.segment_pos_range(self.archive_header.output_mmr_size);
|
||||
if last > local_rangeproof_mmr_size
|
||||
&& !self.has_rangeproof_segment_with_id(rp_id)
|
||||
{
|
||||
return_vec.push(SegmentTypeIdentifier::new(SegmentType::RangeProof, rp_id));
|
||||
elems_added += 1;
|
||||
}
|
||||
next_rp_idx += 1;
|
||||
// We have all required bitmap segments and have recreated our local
|
||||
// bitmap, now continue with other segments, evenly spreading requests
|
||||
// among MMRs
|
||||
let local_output_mmr_size;
|
||||
let local_kernel_mmr_size;
|
||||
let local_rangeproof_mmr_size;
|
||||
{
|
||||
let txhashset = self.txhashset.read();
|
||||
local_output_mmr_size = txhashset.output_mmr_size();
|
||||
local_kernel_mmr_size = txhashset.kernel_mmr_size();
|
||||
local_rangeproof_mmr_size = txhashset.rangeproof_mmr_size();
|
||||
}
|
||||
let total_output_segments = SegmentIdentifier::count_segments_required(
|
||||
self.archive_header.output_mmr_size,
|
||||
self.default_output_segment_height,
|
||||
);
|
||||
let mut elems_added = 0;
|
||||
if let Some(mut next_output_idx) = self.next_required_output_segment_index() {
|
||||
while (next_output_idx as usize) < total_output_segments {
|
||||
if elems_added == max_elements / 3 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let total_kernel_segments = SegmentIdentifier::count_segments_required(
|
||||
self.archive_header.kernel_mmr_size,
|
||||
self.default_kernel_segment_height,
|
||||
);
|
||||
elems_added = 0;
|
||||
if let Some(mut next_kernel_idx) = self.next_required_kernel_segment_index() {
|
||||
while (next_kernel_idx as usize) < total_kernel_segments {
|
||||
if self.kernel_segment_cache.len() >= self.max_cached_segments {
|
||||
break;
|
||||
}
|
||||
if elems_added == max_elements / 3 {
|
||||
break;
|
||||
}
|
||||
let k_id = SegmentIdentifier {
|
||||
height: self.default_kernel_segment_height,
|
||||
idx: next_kernel_idx,
|
||||
};
|
||||
let (_first, last) =
|
||||
k_id.segment_pos_range(self.archive_header.kernel_mmr_size);
|
||||
if last > local_kernel_mmr_size && !self.has_kernel_segment_with_id(k_id) {
|
||||
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Kernel, k_id));
|
||||
elems_added += 1;
|
||||
}
|
||||
next_kernel_idx += 1;
|
||||
let output_id = SegmentIdentifier {
|
||||
height: self.default_output_segment_height,
|
||||
idx: next_output_idx,
|
||||
};
|
||||
let (_first, last) =
|
||||
output_id.segment_pos_range(self.archive_header.output_mmr_size);
|
||||
if last > local_output_mmr_size && !self.has_output_segment_with_id(output_id) {
|
||||
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Output, output_id));
|
||||
elems_added += 1;
|
||||
}
|
||||
next_output_idx += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let total_rangeproof_segments = SegmentIdentifier::count_segments_required(
|
||||
self.archive_header.output_mmr_size,
|
||||
self.default_rangeproof_segment_height,
|
||||
);
|
||||
elems_added = 0;
|
||||
if let Some(mut next_rp_idx) = self.next_required_rangeproof_segment_index() {
|
||||
while (next_rp_idx as usize) < total_rangeproof_segments {
|
||||
if elems_added == max_elements / 3 {
|
||||
break;
|
||||
}
|
||||
let rp_id = SegmentIdentifier {
|
||||
height: self.default_rangeproof_segment_height,
|
||||
idx: next_rp_idx,
|
||||
};
|
||||
let (_first, last) = rp_id.segment_pos_range(self.archive_header.output_mmr_size);
|
||||
if last > local_rangeproof_mmr_size && !self.has_rangeproof_segment_with_id(rp_id) {
|
||||
return_vec.push(SegmentTypeIdentifier::new(SegmentType::RangeProof, rp_id));
|
||||
elems_added += 1;
|
||||
}
|
||||
next_rp_idx += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let total_kernel_segments = SegmentIdentifier::count_segments_required(
|
||||
self.archive_header.kernel_mmr_size,
|
||||
self.default_kernel_segment_height,
|
||||
);
|
||||
elems_added = 0;
|
||||
if let Some(mut next_kernel_idx) = self.next_required_kernel_segment_index() {
|
||||
while (next_kernel_idx as usize) < total_kernel_segments {
|
||||
if elems_added == max_elements / 3 {
|
||||
break;
|
||||
}
|
||||
let k_id = SegmentIdentifier {
|
||||
height: self.default_kernel_segment_height,
|
||||
idx: next_kernel_idx,
|
||||
};
|
||||
let (_first, last) = k_id.segment_pos_range(self.archive_header.kernel_mmr_size);
|
||||
if last > local_kernel_mmr_size && !self.has_kernel_segment_with_id(k_id) {
|
||||
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Kernel, k_id));
|
||||
elems_added += 1;
|
||||
}
|
||||
next_kernel_idx += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Explicitly add segment identifier to request if not exists.
|
||||
let mut maybe_add_to_request = |seg_id: SegmentTypeIdentifier| {
|
||||
if !return_vec.iter().any(|i| i == &seg_id) {
|
||||
if return_vec.len() >= max_elements {
|
||||
return_vec.pop();
|
||||
}
|
||||
return_vec.push(seg_id);
|
||||
}
|
||||
};
|
||||
|
||||
// Ensure we explicitly ask for the next output segment.
|
||||
if let Some(next_output_idx) = self.next_required_output_segment_index() {
|
||||
let seg_id = SegmentIdentifier {
|
||||
height: self.default_output_segment_height,
|
||||
idx: next_output_idx,
|
||||
};
|
||||
if !self.has_output_segment_with_id(seg_id) {
|
||||
let next_output_seg_id = SegmentTypeIdentifier::new(SegmentType::Output, seg_id);
|
||||
maybe_add_to_request(next_output_seg_id);
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure we explicitly ask for the next rangeproof segment.
|
||||
if let Some(next_rp_idx) = self.next_required_rangeproof_segment_index() {
|
||||
let seg_id = SegmentIdentifier {
|
||||
height: self.default_rangeproof_segment_height,
|
||||
idx: next_rp_idx,
|
||||
};
|
||||
if !self.has_rangeproof_segment_with_id(seg_id) {
|
||||
let next_proof_seg_id = SegmentTypeIdentifier::new(SegmentType::RangeProof, seg_id);
|
||||
maybe_add_to_request(next_proof_seg_id);
|
||||
}
|
||||
}
|
||||
|
||||
// Always ensure we explicitly ask for the very next kernel segment we are waiting on.
|
||||
// The regular round-robin above can get saturated with outputs and rangeproofs while
|
||||
// the desegmenter is blocked on a missing kernel, so we force this one in.
|
||||
@@ -601,14 +625,9 @@ impl Desegmenter {
|
||||
height: self.default_kernel_segment_height,
|
||||
idx: next_kernel_idx,
|
||||
};
|
||||
let next_kernel_seg_id = SegmentTypeIdentifier::new(SegmentType::Kernel, seg_id);
|
||||
if !self.has_kernel_segment_with_id(seg_id)
|
||||
&& !return_vec.iter().any(|x| x == &next_kernel_seg_id)
|
||||
{
|
||||
if return_vec.len() >= max_elements {
|
||||
return_vec.pop();
|
||||
}
|
||||
return_vec.push(next_kernel_seg_id);
|
||||
if !self.has_kernel_segment_with_id(seg_id) {
|
||||
let next_kernel_seg_id = SegmentTypeIdentifier::new(SegmentType::Kernel, seg_id);
|
||||
maybe_add_to_request(next_kernel_seg_id);
|
||||
}
|
||||
}
|
||||
if return_vec.is_empty() && self.bitmap_cache.is_some() {
|
||||
|
||||
@@ -316,6 +316,16 @@ impl SyncState {
|
||||
removed_segments
|
||||
}
|
||||
|
||||
/// Drop all tracked PIBD requests, returning how many entries were removed.
|
||||
pub fn clear_pibd_requests(&self) -> usize {
|
||||
let mut requests = self.requested_pibd_segments.write();
|
||||
let cleared = requests.len();
|
||||
if cleared > 0 {
|
||||
requests.clear();
|
||||
}
|
||||
cleared
|
||||
}
|
||||
|
||||
/// Check whether segment is in request list
|
||||
pub fn contains_pibd_segment(&self, id: &SegmentTypeIdentifier) -> bool {
|
||||
self.requested_pibd_segments
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use self::chain::txhashset::{BitmapAccumulator, BitmapSegment};
|
||||
use self::core::core::pmmr::segment::{Segment, SegmentIdentifier};
|
||||
use self::core::ser::{
|
||||
BinReader, BinWriter, DeserializationMode, ProtocolVersion, Readable, Writeable,
|
||||
self, BinReader, BinWriter, DeserializationMode, ProtocolVersion, Readable, Writeable,
|
||||
};
|
||||
use croaring::Bitmap;
|
||||
use grin_chain as chain;
|
||||
@@ -10,6 +10,29 @@ use grin_util::secp::rand::Rng;
|
||||
use rand::thread_rng;
|
||||
use std::io::Cursor;
|
||||
|
||||
fn push_u16(bytes: &mut Vec<u8>, n: u16) {
|
||||
bytes.extend_from_slice(&n.to_be_bytes());
|
||||
}
|
||||
|
||||
fn push_u64(bytes: &mut Vec<u8>, n: u64) {
|
||||
bytes.extend_from_slice(&n.to_be_bytes());
|
||||
}
|
||||
|
||||
fn bitmap_segment_header(height: u8, idx: u64, n_blocks: u16) -> Vec<u8> {
|
||||
let mut bytes = vec![height];
|
||||
push_u64(&mut bytes, idx);
|
||||
push_u16(&mut bytes, n_blocks);
|
||||
bytes
|
||||
}
|
||||
|
||||
fn read_bitmap_segment(bytes: &[u8]) -> Result<BitmapSegment, ser::Error> {
|
||||
ser::deserialize(
|
||||
&mut &bytes[..],
|
||||
ProtocolVersion(1),
|
||||
DeserializationMode::default(),
|
||||
)
|
||||
}
|
||||
|
||||
fn test_roundtrip(entries: usize) {
|
||||
let mut rng = thread_rng();
|
||||
|
||||
@@ -63,7 +86,7 @@ fn test_roundtrip(entries: usize) {
|
||||
assert_eq!(bms, bms2);
|
||||
|
||||
// Convert back to `Segment`
|
||||
let segment2 = Segment::from(bms2);
|
||||
let segment2 = bms2.into_segment().unwrap();
|
||||
assert_eq!(segment, segment2);
|
||||
}
|
||||
|
||||
@@ -83,3 +106,39 @@ fn abundant_segment_ser_roundtrip() {
|
||||
let max = 1 << 16;
|
||||
test_roundtrip(thread_rng().gen_range(max - 4096, max - 1024));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bitmap_segment_read_rejects_empty_blocks() {
|
||||
let bytes = bitmap_segment_header(9, 0, 0);
|
||||
assert_eq!(
|
||||
read_bitmap_segment(&bytes).err(),
|
||||
Some(ser::Error::CorruptedData)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bitmap_segment_read_rejects_too_many_blocks() {
|
||||
let bytes = bitmap_segment_header(9, 0, 9);
|
||||
assert_eq!(
|
||||
read_bitmap_segment(&bytes).err(),
|
||||
Some(ser::Error::TooLargeReadErr)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bitmap_segment_read_rejects_too_large_height() {
|
||||
let bytes = bitmap_segment_header(14, 0, 1);
|
||||
assert_eq!(
|
||||
read_bitmap_segment(&bytes).err(),
|
||||
Some(ser::Error::TooLargeReadErr)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bitmap_segment_read_rejects_offset_overflow() {
|
||||
let bytes = bitmap_segment_header(13, u64::MAX, 1);
|
||||
assert_eq!(
|
||||
read_bitmap_segment(&bytes).err(),
|
||||
Some(ser::Error::TooLargeReadErr)
|
||||
);
|
||||
}
|
||||
|
||||
+5
-5
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "grin_config"
|
||||
version = "5.4.0"
|
||||
version = "5.4.1"
|
||||
authors = ["Grin Developers <mimblewimble@lists.launchpad.net>"]
|
||||
description = "Configuration for grin, a simple, private and scalable cryptocurrency implementation based on the Mimblewimble chain format."
|
||||
license = "Apache-2.0"
|
||||
@@ -16,10 +16,10 @@ serde_derive = "1"
|
||||
toml = "0.5"
|
||||
dirs = "2.0"
|
||||
|
||||
grin_core = { path = "../core", version = "5.4.0" }
|
||||
grin_servers = { path = "../servers", version = "5.4.0" }
|
||||
grin_p2p = { path = "../p2p", version = "5.4.0" }
|
||||
grin_util = { path = "../util", version = "5.4.0" }
|
||||
grin_core = { path = "../core", version = "5.4.1" }
|
||||
grin_servers = { path = "../servers", version = "5.4.1" }
|
||||
grin_p2p = { path = "../p2p", version = "5.4.1" }
|
||||
grin_util = { path = "../util", version = "5.4.1" }
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = "0.6.1"
|
||||
|
||||
+3
-3
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "grin_core"
|
||||
version = "5.4.0"
|
||||
version = "5.4.1"
|
||||
authors = ["Grin Developers <mimblewimble@lists.launchpad.net>"]
|
||||
description = "Chain implementation for grin, a simple, private and scalable cryptocurrency implementation based on the Mimblewimble chain format."
|
||||
license = "Apache-2.0"
|
||||
@@ -28,8 +28,8 @@ chrono = { version = "0.4.11", features = ["serde"] }
|
||||
zeroize = { version = "1.1", features =["zeroize_derive"] }
|
||||
bytes = "0.5"
|
||||
|
||||
keychain = { package = "grin_keychain", path = "../keychain", version = "5.4.0" }
|
||||
util = { package = "grin_util", path = "../util", version = "5.4.0" }
|
||||
keychain = { package = "grin_keychain", path = "../keychain", version = "5.4.1" }
|
||||
util = { package = "grin_util", path = "../util", version = "5.4.1" }
|
||||
|
||||
[dev-dependencies]
|
||||
serde_json = "1"
|
||||
|
||||
@@ -21,6 +21,39 @@ use croaring::Bitmap;
|
||||
use std::cmp::min;
|
||||
use std::fmt::Debug;
|
||||
|
||||
const MAX_SEGMENT_READ_ITEMS: u64 = 1_000_000;
|
||||
const SEGMENT_READ_PREALLOC_ITEMS: u64 = 1024;
|
||||
|
||||
fn read_segment_item_count<R: Reader>(reader: &mut R) -> Result<u64, Error> {
|
||||
let count = reader.read_u64()?;
|
||||
if count > MAX_SEGMENT_READ_ITEMS {
|
||||
return Err(Error::TooLargeReadErr);
|
||||
}
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
fn read_segment_positions<R: Reader>(reader: &mut R, count: u64) -> Result<Vec<u64>, Error> {
|
||||
let mut positions = Vec::with_capacity(min(count, SEGMENT_READ_PREALLOC_ITEMS) as usize);
|
||||
let mut last_pos = 0;
|
||||
for _ in 0..count {
|
||||
let pos = reader.read_u64()?;
|
||||
if pos <= last_pos {
|
||||
return Err(Error::SortError);
|
||||
}
|
||||
last_pos = pos;
|
||||
positions.push(pos - 1);
|
||||
}
|
||||
Ok(positions)
|
||||
}
|
||||
|
||||
fn read_segment_items<T: Readable, R: Reader>(reader: &mut R, count: u64) -> Result<Vec<T>, Error> {
|
||||
let mut items = Vec::with_capacity(min(count, SEGMENT_READ_PREALLOC_ITEMS) as usize);
|
||||
for _ in 0..count {
|
||||
items.push(T::read(reader)?);
|
||||
}
|
||||
Ok(items)
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
/// Possible segment types, according to this desegmenter
|
||||
pub enum SegmentType {
|
||||
@@ -568,39 +601,13 @@ impl<T: Readable> Readable for Segment<T> {
|
||||
fn read<R: Reader>(reader: &mut R) -> Result<Self, Error> {
|
||||
let identifier = Readable::read(reader)?;
|
||||
|
||||
let n_hashes = reader.read_u64()? as usize;
|
||||
let mut hash_pos = Vec::with_capacity(n_hashes);
|
||||
let mut last_pos = 0;
|
||||
for _ in 0..n_hashes {
|
||||
let pos = reader.read_u64()?;
|
||||
if pos <= last_pos {
|
||||
return Err(Error::SortError);
|
||||
}
|
||||
last_pos = pos;
|
||||
hash_pos.push(pos - 1);
|
||||
}
|
||||
let n_hashes = read_segment_item_count(reader)?;
|
||||
let hash_pos = read_segment_positions(reader, n_hashes)?;
|
||||
let hashes = read_segment_items(reader, n_hashes)?;
|
||||
|
||||
let mut hashes = Vec::<Hash>::with_capacity(n_hashes);
|
||||
for _ in 0..n_hashes {
|
||||
hashes.push(Readable::read(reader)?);
|
||||
}
|
||||
|
||||
let n_leaves = reader.read_u64()? as usize;
|
||||
let mut leaf_pos = Vec::with_capacity(n_leaves);
|
||||
last_pos = 0;
|
||||
for _ in 0..n_leaves {
|
||||
let pos = reader.read_u64()?;
|
||||
if pos <= last_pos {
|
||||
return Err(Error::SortError);
|
||||
}
|
||||
last_pos = pos;
|
||||
leaf_pos.push(pos - 1);
|
||||
}
|
||||
|
||||
let mut leaf_data = Vec::<T>::with_capacity(n_leaves);
|
||||
for _ in 0..n_leaves {
|
||||
leaf_data.push(Readable::read(reader)?);
|
||||
}
|
||||
let n_leaves = read_segment_item_count(reader)?;
|
||||
let leaf_pos = read_segment_positions(reader, n_leaves)?;
|
||||
let leaf_data = read_segment_items(reader, n_leaves)?;
|
||||
|
||||
let proof = Readable::read(reader)?;
|
||||
|
||||
@@ -823,12 +830,8 @@ impl SegmentProof {
|
||||
|
||||
impl Readable for SegmentProof {
|
||||
fn read<R: Reader>(reader: &mut R) -> Result<Self, Error> {
|
||||
let n_hashes = reader.read_u64()? as usize;
|
||||
let mut hashes = Vec::with_capacity(n_hashes);
|
||||
for _ in 0..n_hashes {
|
||||
let hash: Hash = Readable::read(reader)?;
|
||||
hashes.push(hash);
|
||||
}
|
||||
let n_hashes = read_segment_item_count(reader)?;
|
||||
let hashes = read_segment_items(reader, n_hashes)?;
|
||||
Ok(Self { hashes })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,10 +16,15 @@ mod common;
|
||||
|
||||
use self::core::core::pmmr;
|
||||
use self::core::core::{Segment, SegmentIdentifier};
|
||||
use self::core::ser::{self, DeserializationMode, ProtocolVersion};
|
||||
use common::TestElem;
|
||||
use grin_core as core;
|
||||
use grin_core::core::pmmr::ReadablePMMR;
|
||||
|
||||
fn push_u64(bytes: &mut Vec<u8>, n: u64) {
|
||||
bytes.extend_from_slice(&n.to_be_bytes());
|
||||
}
|
||||
|
||||
fn test_unprunable_size(height: u8, n_leaves: u32) {
|
||||
let size = 1u64 << height;
|
||||
let n_segments = (n_leaves as u64 + size - 1) / size;
|
||||
@@ -59,3 +64,30 @@ fn unprunable_mmr() {
|
||||
test_unprunable_size(3, i);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn segment_read_rejects_large_hash_count() {
|
||||
let mut bytes = vec![1];
|
||||
push_u64(&mut bytes, 0);
|
||||
push_u64(&mut bytes, 1_000_001);
|
||||
|
||||
let res: Result<Segment<TestElem>, _> = ser::deserialize(
|
||||
&mut &bytes[..],
|
||||
ProtocolVersion(1),
|
||||
DeserializationMode::default(),
|
||||
);
|
||||
assert_eq!(res.err(), Some(ser::Error::TooLargeReadErr));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn segment_proof_read_rejects_large_hash_count() {
|
||||
let mut bytes = vec![];
|
||||
push_u64(&mut bytes, 1_000_001);
|
||||
|
||||
let res: Result<self::core::core::SegmentProof, _> = ser::deserialize(
|
||||
&mut &bytes[..],
|
||||
ProtocolVersion(1),
|
||||
DeserializationMode::default(),
|
||||
);
|
||||
assert_eq!(res.err(), Some(ser::Error::TooLargeReadErr));
|
||||
}
|
||||
|
||||
+2
-2
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "grin_keychain"
|
||||
version = "5.4.0"
|
||||
version = "5.4.1"
|
||||
authors = ["Grin Developers <mimblewimble@lists.launchpad.net>"]
|
||||
description = "Chain implementation for grin, a simple, private and scalable cryptocurrency implementation based on the Mimblewimble chain format."
|
||||
license = "Apache-2.0"
|
||||
@@ -26,4 +26,4 @@ ripemd160 = "0.9"
|
||||
sha2 = "0.9"
|
||||
pbkdf2 = "0.8"
|
||||
|
||||
grin_util = { path = "../util", version = "5.4.0" }
|
||||
grin_util = { path = "../util", version = "5.4.1" }
|
||||
|
||||
+6
-6
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "grin_p2p"
|
||||
version = "5.4.0"
|
||||
version = "5.4.1"
|
||||
authors = ["Grin Developers <mimblewimble@lists.launchpad.net>"]
|
||||
description = "Chain implementation for grin, a simple, private and scalable cryptocurrency implementation based on the Mimblewimble chain format."
|
||||
license = "Apache-2.0"
|
||||
@@ -23,13 +23,13 @@ log = "0.4"
|
||||
chrono = { version = "0.4.11", features = ["serde"] }
|
||||
bytes = "0.5"
|
||||
|
||||
grin_core = { path = "../core", version = "5.4.0" }
|
||||
grin_store = { path = "../store", version = "5.4.0" }
|
||||
grin_util = { path = "../util", version = "5.4.0" }
|
||||
grin_chain = { path = "../chain", version = "5.4.0" }
|
||||
grin_core = { path = "../core", version = "5.4.1" }
|
||||
grin_store = { path = "../store", version = "5.4.1" }
|
||||
grin_util = { path = "../util", version = "5.4.1" }
|
||||
grin_chain = { path = "../chain", version = "5.4.1" }
|
||||
|
||||
[dev-dependencies]
|
||||
grin_pool = { path = "../pool", version = "5.4.0" }
|
||||
grin_pool = { path = "../pool", version = "5.4.1" }
|
||||
|
||||
[build-dependencies]
|
||||
built = { version = "0.8.0", features = ["git2"]}
|
||||
+2
-2
@@ -48,9 +48,9 @@ pub mod built_info {
|
||||
/// Grin's user agent with current version
|
||||
pub fn user_agent() -> String {
|
||||
format!(
|
||||
"MW/Grin {}{}",
|
||||
"MW/Grim {}{}",
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
built_info::GIT_COMMIT_HASH_SHORT.map_or_else(|| "".to_owned(), |v| ".".to_owned() + v)
|
||||
built_info::GIT_COMMIT_HASH_SHORT.map_or_else(|| "+".to_owned(), |v| ".".to_owned() + v)
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
+1
-2
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use crate::util::{Mutex, RwLock};
|
||||
use lru_cache::LruCache;
|
||||
use std::fmt;
|
||||
use std::fs::File;
|
||||
use std::net::{Shutdown, TcpStream};
|
||||
@@ -20,8 +21,6 @@ use std::path::PathBuf;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use lru_cache::LruCache;
|
||||
|
||||
use crate::chain;
|
||||
use crate::chain::txhashset::BitmapChunk;
|
||||
use crate::conn;
|
||||
|
||||
@@ -45,6 +45,7 @@ pub struct Peers {
|
||||
pub adapter: Arc<dyn ChainAdapter>,
|
||||
store: PeerStore,
|
||||
peers: RwLock<HashMap<PeerAddr, Arc<Peer>>>,
|
||||
blocked: RwLock<HashMap<PeerAddr, (DateTime<Utc>, u32)>>,
|
||||
config: P2PConfig,
|
||||
}
|
||||
|
||||
@@ -55,6 +56,7 @@ impl Peers {
|
||||
store,
|
||||
config,
|
||||
peers: RwLock::new(HashMap::new()),
|
||||
blocked: RwLock::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -445,6 +447,74 @@ impl Peers {
|
||||
}
|
||||
}
|
||||
|
||||
/// Disconnect a peer without banning it.
|
||||
pub fn disconnect_peer(&self, peer_addr: PeerAddr, reason: &str) -> Result<(), Error> {
|
||||
let mut peers = self.peers.try_write_for(LOCK_TIMEOUT).ok_or_else(|| {
|
||||
error!("disconnect_peer: failed to get peers lock");
|
||||
Error::PeerException
|
||||
})?;
|
||||
match peers.remove(&peer_addr) {
|
||||
Some(peer) => {
|
||||
warn!("disconnecting peer {} ({})", peer_addr, reason);
|
||||
peer.stop();
|
||||
Ok(())
|
||||
}
|
||||
None => Err(Error::PeerNotFound),
|
||||
}
|
||||
}
|
||||
|
||||
/// Whether this peer has been blocked.
|
||||
pub fn is_blocked(&self, peer_addr: PeerAddr) -> bool {
|
||||
match self.blocked.try_read_for(LOCK_TIMEOUT) {
|
||||
Some(peers) => match peers.get(&peer_addr) {
|
||||
None => false,
|
||||
Some((expiry, _)) => expiry > &Utc::now(),
|
||||
},
|
||||
None => {
|
||||
error!("is_blocked: failed to get peers lock");
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Temporary block a peer without banning it.
|
||||
pub fn block_peer(&self, peer_addr: PeerAddr, reason: &str) -> Result<(), Error> {
|
||||
let mut blocked = self.blocked.try_write_for(LOCK_TIMEOUT).ok_or_else(|| {
|
||||
error!("block_peer: failed to get blocked lock");
|
||||
Error::PeerException
|
||||
})?;
|
||||
|
||||
let times = {
|
||||
match blocked.get(&peer_addr) {
|
||||
Some((_, times)) => times + 1,
|
||||
None => 1,
|
||||
}
|
||||
};
|
||||
let duration = match times {
|
||||
1 => 60, // 1m
|
||||
2 => 180, // 3m
|
||||
_ => 600, // 10m
|
||||
};
|
||||
let expiry = Utc::now() + Duration::seconds(duration);
|
||||
blocked.insert(peer_addr, (expiry, times));
|
||||
|
||||
warn!(
|
||||
"state_sync: block peer {} ({}) for {} times: {}",
|
||||
peer_addr, reason, duration, times
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Unblock all blocked peers.
|
||||
pub fn unblock_peers(&self) -> Result<(), Error> {
|
||||
let mut blocked = self.blocked.try_write_for(LOCK_TIMEOUT).ok_or_else(|| {
|
||||
error!("unblock_peers: failed to get blocked lock");
|
||||
Error::PeerException
|
||||
})?;
|
||||
blocked.clear();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// We have enough outbound connected peers
|
||||
pub fn enough_outbound_peers(&self) -> bool {
|
||||
self.iter().outbound().connected().count()
|
||||
@@ -832,6 +902,16 @@ impl<I: Iterator<Item = Arc<Peer>>> PeersIter<I> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Custom filter.
|
||||
pub fn with_filter(
|
||||
self,
|
||||
f: impl Fn(&Arc<Peer>) -> bool,
|
||||
) -> PeersIter<impl Iterator<Item = Arc<Peer>>> {
|
||||
PeersIter {
|
||||
iter: self.iter.filter(move |p| f(p)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn by_addr(&mut self, addr: PeerAddr) -> Option<Arc<Peer>> {
|
||||
self.iter.find(|p| p.info.addr == addr)
|
||||
}
|
||||
|
||||
+1
-1
@@ -382,7 +382,7 @@ impl MessageHandler for Protocol {
|
||||
block_hash,
|
||||
output_root
|
||||
);
|
||||
adapter.receive_bitmap_segment(block_hash, output_root, segment.into())?;
|
||||
adapter.receive_bitmap_segment(block_hash, output_root, segment.into_segment()?)?;
|
||||
Consumed::None
|
||||
}
|
||||
Message::OutputSegment(req) => {
|
||||
|
||||
@@ -18,6 +18,7 @@ use grin_p2p as p2p;
|
||||
use grin_util as util;
|
||||
use grin_util::StopState;
|
||||
|
||||
use std::fs;
|
||||
use std::net::{SocketAddr, TcpListener, TcpStream};
|
||||
use std::sync::Arc;
|
||||
use std::{thread, time};
|
||||
@@ -44,11 +45,17 @@ fn test_setup() {
|
||||
util::init_test_logger();
|
||||
}
|
||||
|
||||
fn clean_output_dir(dir_name: &str) {
|
||||
let _ = fs::remove_dir_all(dir_name);
|
||||
}
|
||||
|
||||
// Starts a server and connects a client peer to it to check handshake,
|
||||
// followed by a ping/pong exchange to make sure the connection is live.
|
||||
#[test]
|
||||
fn peer_handshake() {
|
||||
test_setup();
|
||||
let test_dir = "target/peer_handshake";
|
||||
clean_output_dir(test_dir);
|
||||
|
||||
let p2p_config = p2p::P2PConfig {
|
||||
host: "127.0.0.1".parse().unwrap(),
|
||||
@@ -60,7 +67,7 @@ fn peer_handshake() {
|
||||
let net_adapter = Arc::new(p2p::DummyAdapter {});
|
||||
let server = Arc::new(
|
||||
p2p::Server::new(
|
||||
".grin",
|
||||
test_dir,
|
||||
p2p::Capabilities::UNKNOWN,
|
||||
p2p_config.clone(),
|
||||
net_adapter.clone(),
|
||||
|
||||
+5
-5
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "grin_pool"
|
||||
version = "5.4.0"
|
||||
version = "5.4.1"
|
||||
authors = ["Grin Developers <mimblewimble@lists.launchpad.net>"]
|
||||
description = "Chain implementation for grin, a simple, private and scalable cryptocurrency implementation based on the Mimblewimble chain format."
|
||||
license = "Apache-2.0"
|
||||
@@ -18,9 +18,9 @@ thiserror = "1"
|
||||
log = "0.4"
|
||||
chrono = "0.4.11"
|
||||
|
||||
grin_core = { path = "../core", version = "5.4.0" }
|
||||
grin_keychain = { path = "../keychain", version = "5.4.0" }
|
||||
grin_util = { path = "../util", version = "5.4.0" }
|
||||
grin_core = { path = "../core", version = "5.4.1" }
|
||||
grin_keychain = { path = "../keychain", version = "5.4.1" }
|
||||
grin_util = { path = "../util", version = "5.4.1" }
|
||||
|
||||
[dev-dependencies]
|
||||
grin_chain = { path = "../chain", version = "5.4.0" }
|
||||
grin_chain = { path = "../chain", version = "5.4.1" }
|
||||
|
||||
+9
-9
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "grin_servers"
|
||||
version = "5.4.0"
|
||||
version = "5.4.1"
|
||||
authors = ["Grin Developers <mimblewimble@lists.launchpad.net>"]
|
||||
description = "Simple, private and scalable cryptocurrency implementation based on the Mimblewimble chain format."
|
||||
license = "Apache-2.0"
|
||||
@@ -27,11 +27,11 @@ async-stream = "0.3"
|
||||
rustls = "0.20"
|
||||
walkdir = "2.3.1"
|
||||
|
||||
grin_api = { path = "../api", version = "5.4.0" }
|
||||
grin_chain = { path = "../chain", version = "5.4.0" }
|
||||
grin_core = { path = "../core", version = "5.4.0" }
|
||||
grin_keychain = { path = "../keychain", version = "5.4.0" }
|
||||
grin_p2p = { path = "../p2p", version = "5.4.0" }
|
||||
grin_pool = { path = "../pool", version = "5.4.0" }
|
||||
grin_store = { path = "../store", version = "5.4.0" }
|
||||
grin_util = { path = "../util", version = "5.4.0" }
|
||||
grin_api = { path = "../api", version = "5.4.1" }
|
||||
grin_chain = { path = "../chain", version = "5.4.1" }
|
||||
grin_core = { path = "../core", version = "5.4.1" }
|
||||
grin_keychain = { path = "../keychain", version = "5.4.1" }
|
||||
grin_p2p = { path = "../p2p", version = "5.4.1" }
|
||||
grin_pool = { path = "../pool", version = "5.4.1" }
|
||||
grin_store = { path = "../store", version = "5.4.1" }
|
||||
grin_util = { path = "../util", version = "5.4.1" }
|
||||
|
||||
@@ -194,7 +194,8 @@ fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sende
|
||||
return;
|
||||
}
|
||||
|
||||
if !peers.enough_outbound_peers() {
|
||||
let enough_outbound = peers.enough_outbound_peers();
|
||||
if !enough_outbound {
|
||||
// loop over connected peers that can provide peer lists
|
||||
// ask them for their list of peers
|
||||
let mut connected_peers: Vec<PeerAddr> = vec![];
|
||||
@@ -239,7 +240,8 @@ fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sende
|
||||
.iter()
|
||||
.filter(|p| {
|
||||
peers.get_connected_peer(p.addr).is_none()
|
||||
&& Utc::now().timestamp() - p.last_attempt >= max_attempt_delay
|
||||
&& (!enough_outbound
|
||||
|| Utc::now().timestamp() - p.last_attempt >= max_attempt_delay)
|
||||
})
|
||||
.choose_multiple(&mut thread_rng(), max_peer_attempts / 2)
|
||||
{
|
||||
@@ -268,7 +270,9 @@ fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sende
|
||||
// check min 32 (max 128, if there are no healthy and unknown) random defunct peers no more often than 1 hour per peer.
|
||||
for dp in defuncts
|
||||
.iter()
|
||||
.filter(|p| Utc::now().timestamp() - p.last_attempt >= max_attempt_delay)
|
||||
.filter(|p| {
|
||||
!enough_outbound || Utc::now().timestamp() - p.last_attempt >= max_attempt_delay
|
||||
})
|
||||
.choose_multiple(&mut thread_rng(), max_peer_attempts - new_peers.len())
|
||||
{
|
||||
new_peers.push(&dp.addr);
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use chrono::prelude::{DateTime, Utc};
|
||||
use chrono::Duration;
|
||||
use grin_p2p::PeerAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::chain::{self, pibd_params, SyncState, SyncStatus};
|
||||
@@ -256,6 +257,46 @@ impl StateSync {
|
||||
.sync_state
|
||||
.remove_stale_pibd_requests(pibd_params::SEGMENT_REQUEST_TIMEOUT_SECS);
|
||||
|
||||
if !stale_segments.is_empty() {
|
||||
for (seg_id, peer_addr) in stale_segments.iter() {
|
||||
if let Some(peer_addr) = peer_addr {
|
||||
let _ = self
|
||||
.peers
|
||||
.block_peer(PeerAddr(*peer_addr), "PIBD segment timeout");
|
||||
debug!(
|
||||
"state_sync: peer {} moved to PIBD retry exclusion list for segment {:?}",
|
||||
peer_addr, seg_id
|
||||
);
|
||||
let is_outbound = {
|
||||
self.peers
|
||||
.iter()
|
||||
.outbound()
|
||||
.by_addr(PeerAddr(peer_addr.clone()))
|
||||
.is_some()
|
||||
};
|
||||
if is_outbound {
|
||||
debug!("state_sync: disconnecting peer {}", peer_addr);
|
||||
if let Err(e) = self
|
||||
.peers
|
||||
.disconnect_peer(PeerAddr(*peer_addr), "PIBD segment timeout")
|
||||
{
|
||||
debug!(
|
||||
"state_sync: failed to disconnect timed-out peer {}: {:?}",
|
||||
peer_addr, e
|
||||
);
|
||||
}
|
||||
} else {
|
||||
debug!("state_sync: peer {} is not outbound or not connected, do not disconnect", peer_addr);
|
||||
}
|
||||
} else {
|
||||
debug!(
|
||||
"state_sync: PIBD request {:?} timed out without a recorded peer",
|
||||
seg_id
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Apply segments... TODO: figure out how this should be called, might
|
||||
// need to be a separate thread.
|
||||
if let Some(mut de) = desegmenter.try_write() {
|
||||
@@ -318,11 +359,28 @@ impl StateSync {
|
||||
.connected()
|
||||
};
|
||||
|
||||
// Get peers with reasonable height for pibd.
|
||||
let height_slack = pibd_params::PIBD_PEER_HEIGHT_SLACK_BLOCKS;
|
||||
let max_pibd_height = peers_iter_pibd()
|
||||
.into_iter()
|
||||
.map(|p| p.info.height())
|
||||
.max()
|
||||
.unwrap_or(0);
|
||||
|
||||
let available_pibd_peers = || {
|
||||
peers_iter_pibd().with_filter(|p| {
|
||||
p.info.height().saturating_add(height_slack) >= max_pibd_height
|
||||
})
|
||||
};
|
||||
|
||||
// If there are no suitable PIBD-Enabled peers, AND there hasn't been one for a minute,
|
||||
// abort PIBD and fall back to txhashset download
|
||||
// Waiting a minute helps ensures that the cancellation isn't simply due to a single non-PIBD enabled
|
||||
// peer having the max difficulty
|
||||
if peers_iter_pibd().count() == 0 {
|
||||
if available_pibd_peers()
|
||||
.with_filter(|p| !peers.is_blocked(p.info.addr))
|
||||
.count() == 0
|
||||
{
|
||||
if let None = self.earliest_zero_pibd_peer_time {
|
||||
self.set_earliest_zero_pibd_peer_time(Some(Utc::now()));
|
||||
}
|
||||
@@ -339,72 +397,89 @@ impl StateSync {
|
||||
self.set_pibd_aborted();
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
self.set_earliest_zero_pibd_peer_time(None)
|
||||
let cleared = self.sync_state.clear_pibd_requests();
|
||||
if cleared > 0 {
|
||||
warn!(
|
||||
"state_sync: cleared {} pending PIBD requests because no PIBD-enabled peers are currently available",
|
||||
cleared
|
||||
);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
self.set_earliest_zero_pibd_peer_time(None);
|
||||
|
||||
// Choose a random "most work" peer, excluding peer from stale segment and preferring outbound if at all possible.
|
||||
let excluded_peer = stale_segments
|
||||
.iter()
|
||||
.find(|(stale_id, _)| stale_id == seg_id)
|
||||
.and_then(|(_, addr)| *addr);
|
||||
let peer = peers_iter_pibd()
|
||||
let peer = available_pibd_peers()
|
||||
.outbound()
|
||||
.with_filter(|p| !peers.is_blocked(p.info.addr))
|
||||
.exclude(excluded_peer)
|
||||
.choose_random()
|
||||
.or_else(|| {
|
||||
peers_iter_pibd()
|
||||
available_pibd_peers()
|
||||
.inbound()
|
||||
.with_filter(|p| !peers.is_blocked(p.info.addr))
|
||||
.exclude(excluded_peer)
|
||||
.choose_random()
|
||||
});
|
||||
trace!("Chosen peer is {:?}", peer);
|
||||
|
||||
if let Some(p) = peer {
|
||||
// add to list of segments that are being tracked
|
||||
self.sync_state.add_pibd_segment(seg_id, p.info.addr.0);
|
||||
let res = match seg_id.segment_type {
|
||||
SegmentType::Bitmap => p.send_bitmap_segment_request(
|
||||
archive_header.hash(),
|
||||
seg_id.identifier.clone(),
|
||||
),
|
||||
SegmentType::Output => p.send_output_segment_request(
|
||||
archive_header.hash(),
|
||||
seg_id.identifier.clone(),
|
||||
),
|
||||
SegmentType::RangeProof => p.send_rangeproof_segment_request(
|
||||
archive_header.hash(),
|
||||
seg_id.identifier.clone(),
|
||||
),
|
||||
SegmentType::Kernel => p.send_kernel_segment_request(
|
||||
archive_header.hash(),
|
||||
seg_id.identifier.clone(),
|
||||
),
|
||||
};
|
||||
if let Err(e) = res {
|
||||
info!(
|
||||
"Error sending request to peer at {}, reason: {:?}",
|
||||
p.info.addr, e
|
||||
let p = match peer {
|
||||
Some(p) => p,
|
||||
None => {
|
||||
debug!(
|
||||
"state_sync: no eligible PIBD peers available for request {:?}",
|
||||
seg_id
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// add to list of segments that are being tracked
|
||||
self.sync_state.add_pibd_segment(seg_id, p.info.addr.0);
|
||||
|
||||
let res = match seg_id.segment_type {
|
||||
SegmentType::Bitmap => {
|
||||
p.send_bitmap_segment_request(archive_header.hash(), seg_id.identifier.clone())
|
||||
}
|
||||
SegmentType::Output => {
|
||||
p.send_output_segment_request(archive_header.hash(), seg_id.identifier.clone())
|
||||
}
|
||||
SegmentType::RangeProof => p.send_rangeproof_segment_request(
|
||||
archive_header.hash(),
|
||||
seg_id.identifier.clone(),
|
||||
),
|
||||
SegmentType::Kernel => {
|
||||
p.send_kernel_segment_request(archive_header.hash(), seg_id.identifier.clone())
|
||||
}
|
||||
};
|
||||
if let Err(e) = res {
|
||||
info!(
|
||||
"Error sending request to peer at {}, reason: {:?}",
|
||||
p.info.addr, e
|
||||
);
|
||||
self.sync_state.remove_pibd_segment(seg_id);
|
||||
} else if let Some(prev_peer) = excluded_peer {
|
||||
if p.info.addr.0 != prev_peer {
|
||||
info!(
|
||||
"state_sync: retrying segment {:?} with new peer {} (previously {})",
|
||||
seg_id, p.info.addr, prev_peer
|
||||
);
|
||||
self.sync_state.remove_pibd_segment(seg_id);
|
||||
} else if let Some(prev_peer) = excluded_peer {
|
||||
if p.info.addr.0 != prev_peer {
|
||||
info!(
|
||||
"state_sync: retrying segment {:?} with new peer {} (previously {})",
|
||||
seg_id, p.info.addr, prev_peer
|
||||
);
|
||||
} else {
|
||||
debug!(
|
||||
"state_sync: requested segment {:?} from peer {}",
|
||||
seg_id, p.info.addr
|
||||
);
|
||||
}
|
||||
} else {
|
||||
debug!(
|
||||
"state_sync: requested segment {:?} from peer {}",
|
||||
seg_id, p.info.addr
|
||||
);
|
||||
}
|
||||
} else {
|
||||
debug!(
|
||||
"state_sync: requested segment {:?} from peer {}",
|
||||
seg_id, p.info.addr
|
||||
);
|
||||
}
|
||||
}
|
||||
false
|
||||
@@ -495,6 +570,7 @@ impl StateSync {
|
||||
}
|
||||
|
||||
fn state_sync_reset(&mut self) {
|
||||
let _ = self.peers.unblock_peers();
|
||||
self.prev_state_sync = None;
|
||||
self.state_sync_peer = None;
|
||||
}
|
||||
|
||||
+3
-3
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "grin_store"
|
||||
version = "5.4.0"
|
||||
version = "5.4.1"
|
||||
authors = ["Grin Developers <mimblewimble@lists.launchpad.net>"]
|
||||
description = "Simple, private and scalable cryptocurrency implementation based on the Mimblewimble chain format."
|
||||
license = "Apache-2.0"
|
||||
@@ -21,8 +21,8 @@ serde_derive = "1"
|
||||
thiserror = "1"
|
||||
log = "0.4"
|
||||
|
||||
grin_core = { path = "../core", version = "5.4.0" }
|
||||
grin_util = { path = "../util", version = "5.4.0" }
|
||||
grin_core = { path = "../core", version = "5.4.1" }
|
||||
grin_util = { path = "../util", version = "5.4.1" }
|
||||
|
||||
[dev-dependencies]
|
||||
chrono = "0.4.11"
|
||||
|
||||
+81
-80
@@ -228,25 +228,28 @@ impl Store {
|
||||
}
|
||||
|
||||
// Database setup.
|
||||
let r_env_map = env_map.read();
|
||||
let env = r_env_map.get(&full_path).unwrap().env.clone();
|
||||
let mut write = env.write_txn()?;
|
||||
let def_name = db_name.unwrap_or(DEFAULT_ENV_NAME);
|
||||
let def_db = env.create_database(&mut write, Some(def_name))?;
|
||||
let mut dbs_map = HashMap::<u8, Database<Bytes, Bytes>>::new();
|
||||
for p in prefixes {
|
||||
let db = env.create_database(&mut write, Some(p.to_string().as_str()))?;
|
||||
dbs_map.insert(p, db);
|
||||
}
|
||||
write.commit()?;
|
||||
let s = {
|
||||
let r_env_map = env_map.read();
|
||||
let env = r_env_map.get(&full_path).unwrap().env.clone();
|
||||
let mut write = env.write_txn()?;
|
||||
let def_name = db_name.unwrap_or(DEFAULT_ENV_NAME);
|
||||
let def_db = env.create_database(&mut write, Some(def_name))?;
|
||||
let mut dbs_map = HashMap::<u8, Database<Bytes, Bytes>>::new();
|
||||
for p in prefixes {
|
||||
let db = env.create_database(&mut write, Some(p.to_string().as_str()))?;
|
||||
dbs_map.insert(p, db);
|
||||
}
|
||||
write.commit()?;
|
||||
|
||||
let s = Store {
|
||||
env: env.clone(),
|
||||
env_path: full_path.clone(),
|
||||
pre_dbs: Arc::new(dbs_map),
|
||||
def_db,
|
||||
version: DEFAULT_DB_VERSION,
|
||||
alloc_chunk_size,
|
||||
let s = Store {
|
||||
env: env.clone(),
|
||||
env_path: full_path.clone(),
|
||||
pre_dbs: Arc::new(dbs_map),
|
||||
def_db,
|
||||
version: DEFAULT_DB_VERSION,
|
||||
alloc_chunk_size,
|
||||
};
|
||||
s
|
||||
};
|
||||
|
||||
// Migrate to default environment if needed.
|
||||
@@ -632,12 +635,9 @@ impl Store {
|
||||
Ok(read) => {
|
||||
let db_res = self.get_db(db_key);
|
||||
match db_res {
|
||||
Ok(db) => Ok(DatabaseIterator::new(
|
||||
self,
|
||||
Arc::new(db.clone()),
|
||||
read,
|
||||
deserialize,
|
||||
)),
|
||||
Ok(db) => {
|
||||
DatabaseIterator::new(self, Arc::new(db.clone()), read, deserialize)
|
||||
}
|
||||
Err(e) => Err(Error::from(e)),
|
||||
}
|
||||
}
|
||||
@@ -794,12 +794,12 @@ impl<'a> Batch<'a> {
|
||||
Ok(read) => {
|
||||
let db_res = self.store.get_db(db_key);
|
||||
match db_res {
|
||||
Ok(db) => Ok(DatabaseIterator::new(
|
||||
Ok(db) => DatabaseIterator::new(
|
||||
self.store,
|
||||
Arc::new(db.clone()),
|
||||
read,
|
||||
deserialize,
|
||||
)),
|
||||
),
|
||||
Err(e) => Err(Error::from(e)),
|
||||
}
|
||||
}
|
||||
@@ -876,9 +876,9 @@ where
|
||||
db: Arc<Database<Bytes, Bytes>>,
|
||||
read: Arc<RoTxn<'a, WithoutTls>>,
|
||||
keys: Vec<Vec<u8>>,
|
||||
total_keys: usize,
|
||||
skip_cur: usize,
|
||||
skip_total: usize,
|
||||
done: bool,
|
||||
deserialize: F,
|
||||
#[allow(dead_code)]
|
||||
tx_counter: TxCounter,
|
||||
@@ -891,44 +891,37 @@ where
|
||||
type Item = Result<T, Error>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if let Some(k) = self.keys.iter().skip(self.skip_cur).next() {
|
||||
self.skip_total += 1;
|
||||
self.skip_cur += 1;
|
||||
match self.db.get(&self.read, k) {
|
||||
Ok(v) => {
|
||||
if let Some(v) = v {
|
||||
return match (self.deserialize)(k, v) {
|
||||
Ok(v) => Some(Ok(v)),
|
||||
Err(e) => {
|
||||
error!("db iter: error deserializing: {}", e);
|
||||
Some(Err(Error::from(e)))
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
return {
|
||||
error!("db iter: error read value: {}", e);
|
||||
Some(Err(Error::from(e)))
|
||||
loop {
|
||||
if self.done {
|
||||
return None;
|
||||
} else if let Some(k) = self.keys.iter().skip(self.skip_cur).next() {
|
||||
self.skip_total += 1;
|
||||
self.skip_cur += 1;
|
||||
match self.db.get(&self.read, k) {
|
||||
Ok(v) => {
|
||||
if let Some(v) = v {
|
||||
return match (self.deserialize)(k, v) {
|
||||
Ok(v) => Some(Ok(v)),
|
||||
Err(e) => {
|
||||
error!("db iter: error deserializing: {}", e);
|
||||
Some(Err(Error::from(e)))
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
return {
|
||||
error!("db iter: error read value: {}", e);
|
||||
Some(Err(Error::from(e)))
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if let Err(e) = self.load_next_keys() {
|
||||
error!("db iter: error read keys: {}", e);
|
||||
self.done = true;
|
||||
return Some(Err(e));
|
||||
}
|
||||
} else if self.total_keys > self.skip_total {
|
||||
let keys = if let Ok(iter) = self.db.iter(&self.read) {
|
||||
iter.move_between_keys()
|
||||
.skip(self.skip_total)
|
||||
.take(10000)
|
||||
.filter(|kv| kv.is_ok())
|
||||
.map(|kv| kv.unwrap().0.to_vec())
|
||||
.collect::<Vec<Vec<u8>>>()
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
self.skip_cur = 0;
|
||||
self.keys = keys;
|
||||
return self.next();
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
@@ -942,34 +935,42 @@ where
|
||||
db: Arc<Database<Bytes, Bytes>>,
|
||||
read: RoTxn<'a, WithoutTls>,
|
||||
deserialize: F,
|
||||
) -> DatabaseIterator<'a, F, T> {
|
||||
let (keys, total_keys) = if let Ok(iter) = db.iter(&read) {
|
||||
let total = iter.move_between_keys().count();
|
||||
let keys = if let Ok(iter) = db.iter(&read) {
|
||||
iter.move_between_keys()
|
||||
.take(10000)
|
||||
.filter(|kv| kv.is_ok())
|
||||
.map(|kv| kv.unwrap().0.to_vec())
|
||||
.collect::<Vec<Vec<u8>>>()
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
(keys, total)
|
||||
} else {
|
||||
(vec![], 0)
|
||||
};
|
||||
DatabaseIterator {
|
||||
) -> Result<DatabaseIterator<'a, F, T>, Error> {
|
||||
// load keys before constructing tx_counter to avoid double-decrementing open_txs_count on error
|
||||
let keys = Self::read_key_page(&db, &read, 0)?;
|
||||
let done = keys.is_empty();
|
||||
Ok(DatabaseIterator {
|
||||
db,
|
||||
read: Arc::new(read),
|
||||
keys,
|
||||
total_keys,
|
||||
skip_cur: 0,
|
||||
skip_total: 0,
|
||||
done,
|
||||
deserialize,
|
||||
tx_counter: TxCounter {
|
||||
env_path: store.env_path.clone(),
|
||||
},
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn load_next_keys(&mut self) -> Result<(), Error> {
|
||||
self.keys = Self::read_key_page(&self.db, &self.read, self.skip_total)?;
|
||||
self.skip_cur = 0;
|
||||
self.done = self.keys.is_empty();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn read_key_page(
|
||||
db: &Database<Bytes, Bytes>,
|
||||
read: &RoTxn<'a, WithoutTls>,
|
||||
skip: usize,
|
||||
) -> Result<Vec<Vec<u8>>, Error> {
|
||||
let iter = db.iter(read)?;
|
||||
iter.move_between_keys()
|
||||
.skip(skip)
|
||||
.take(10000)
|
||||
.map(|kv| kv.map(|(k, _)| k.to_vec()).map_err(Error::from))
|
||||
.collect::<Result<Vec<Vec<u8>>, Error>>()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -140,6 +140,32 @@ fn test_iter() -> Result<(), store::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_iter_pages() -> Result<(), store::Error> {
|
||||
let test_dir = "target/test_iter_pages";
|
||||
setup(test_dir);
|
||||
|
||||
let prefix = b'P';
|
||||
let store = store::Store::new(test_dir, Some("test1"), None, vec![prefix], None, None)?;
|
||||
|
||||
{
|
||||
let mut batch = store.batch()?;
|
||||
for i in 0..10_001u32 {
|
||||
batch.put(Some(prefix), &i.to_be_bytes(), &[1])?;
|
||||
}
|
||||
batch.commit()?;
|
||||
}
|
||||
|
||||
let count = store
|
||||
.iter(Some(prefix), |_, v| Ok(v.to_vec()))?
|
||||
.collect::<Result<Vec<_>, _>>()?
|
||||
.len();
|
||||
assert_eq!(count, 10_001);
|
||||
|
||||
clean_output_dir(test_dir);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lmdb_allocate() -> Result<(), store::Error> {
|
||||
let test_dir = "target/lmdb_allocate";
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "grin_util"
|
||||
version = "5.4.0"
|
||||
version = "5.4.1"
|
||||
authors = ["Grin Developers <mimblewimble@lists.launchpad.net>"]
|
||||
description = "Simple, private and scalable cryptocurrency implementation based on the Mimblewimble chain format."
|
||||
license = "Apache-2.0"
|
||||
|
||||
Reference in New Issue
Block a user