Compare commits

...

62 Commits

Author SHA1 Message Date
ardocrat bce5a7144b Merge branch 'master' into grim
Continuous Integration / Linux Tests (api) (push) Has been cancelled
Continuous Integration / Linux Tests (chain) (push) Has been cancelled
Continuous Integration / Linux Tests (core) (push) Has been cancelled
Continuous Integration / Linux Tests (keychain) (push) Has been cancelled
Continuous Integration / Linux Tests (p2p) (push) Has been cancelled
Continuous Integration / Linux Tests (pool) (push) Has been cancelled
Continuous Integration / Linux Tests (servers) (push) Has been cancelled
Continuous Integration / Linux Tests (src) (push) Has been cancelled
Continuous Integration / Linux Tests (store) (push) Has been cancelled
Continuous Integration / Linux Tests (util) (push) Has been cancelled
Continuous Integration / macOS Tests (push) Has been cancelled
Continuous Integration / Windows Tests (push) Has been cancelled
# Conflicts:
#	p2p/Cargo.toml
2026-06-11 10:35:27 +03:00
ardocrat b41a0cc209 Merge branch 'staging' into grim
# Conflicts:
#	store/src/lmdb.rs
2026-06-11 10:34:22 +03:00
David Burkett a25fde9674 Bump version to v5.4.1 2026-06-10 20:50:21 -04:00
wiesche ee4390e437 Clarifies the blinding factor range proof explanation and includes the remaining typo fixes (#3840) 2026-06-10 20:42:32 -04:00
David Burkett 68053a203e Add fountainoffairfortune seeds 2026-06-10 20:27:40 -04:00
David Burkett 62e5ace442 pibd: bound segment decoding allocations (#3850) 2026-06-10 12:02:01 -04:00
wiesche cf2ed3f3be propagate lmdb iterator page errors (#3849) 2026-06-07 21:57:43 +02:00
ardocrat 386ac1ed5c Merge remote-tracking branch 'grin_ardocrat/lmdb_update' into grim 2026-06-04 17:47:57 +03:00
ardocrat 110e0e143f Migrate from lmdb-zero to heed (#3825)
* db: migrate from lmdb-zero to heed

* fix: check resizing operation and wait to avoid crash with multiple batches, fix exists check at batch

* build: fix missing deps at Cargo.lock

* lmdb: single environment, migrate existing databases with provided non-default environment name

* fix: revert chunk size to 128mb

* lmdb: ability to use multiple shared environments

* build: remove unused dependency

* fix: resize to have correct multiplier of the system page size

* lmdb: speed up prefix iter by storing keys

* lmdb: default env name

* lmdb: wait db resize before read, reduce timeout before resizing

* lmdb: use static reader for iterator, count existing batches for stable resize

* fix: check batches count on resize waiting

* lmdb: use separate databases instead of prefixes, use default db for values without prefixes, migrate old environment

* fix: pop pos key

* lmdb: count all open transactions to finish before resizing

* lmdb: immediate resize if there are no open transactions

* lmdb: remove env state when there are no more stores

* lmdb: use atomic for resize and resize checking flags

* lmdb: sleep 10ms when waiting all opened txs to be closed

* lmdb: use atomic open txs and stores count

* lmdb: use index to detect separator, ignore unknown db key to not have a panic

* lmdb: store max 10k keys in the iterator

* lmdb: check iter result on getting total

* lmdb: handle errors at iterator

* lmdb: handle an error when db with provided key not found

* lmdb: fix iterate over 10k keys

* lmdb: document migration resize safety

* lmdb: fix iter test

* lmdb: clear new db after unsuccessful migration, handle read error on migration to interrupt process

* store: bring back old key methods to reproduce data migration

* lmdb: return an error on unsuccessful migration

* lmdb: migration test, clean data after allocate test

* lmdb: info migration log

* fix: move iterator before handling an error to allow skip bad value

* lmdb: return an error if removal of old DB file failed after migration

* lmdb: lifetime for iterator, use write transaction at batch iterator

* lmdb: migration progress

* fix: tests

* lmdb: immediately set resizing flag, ignore resizing flag while there are more than 0 opened txs to avoid stuck, optimize tx counter for some operations

* lmdb: key for successful migration

* lmdb: fix put database creation at separate block to avoid lifetime issues when returning an error on migration
2026-06-02 13:53:53 +02:00
ardocrat 0dc4280b68 lmdb: fix put database creation at separate block to avoid lifetime issues when returning an error on migration 2026-06-02 12:31:56 +03:00
ardocrat f3796fa27b Merge branch 'lmdb_update' into grim 2026-05-26 16:30:10 +03:00
ardocrat dc7b49004d Merge branch 'lmdb_update' into grim 2026-05-20 20:08:28 +03:00
ardocrat 22e34311b8 Merge branch 'pibd_peers_fix' into grim 2026-05-20 20:05:52 +03:00
ardocrat a81f59926c lmdb: do not include blocked peers in selection of peer, count also blocked peers to use fallback .zip download 2026-05-15 15:14:47 +03:00
ardocrat 20e4e845a1 ci: github grim branch docker image 2026-05-14 22:59:11 +03:00
ardocrat 2d0ce2d526 Merge branch 'rust_edition' into grim
# Conflicts:
#	.github/workflows/publish-ghcr.yaml.yml
#	p2p/Cargo.toml
#	p2p/src/msg.rs
#	p2p/tests/peer_handshake.rs
2026-05-14 22:52:52 +03:00
ardocrat c4f054ab51 sync: blocked filter 2026-05-14 22:34:22 +03:00
ardocrat 048453f989 peers: keep blocked list into memory till restart or pibd finish 2026-05-14 22:32:01 +03:00
ardocrat 80cba965ab cargo: format all 2026-05-14 13:04:48 +03:00
ardocrat 86eaa7b7ac fix: add missed build to grin_p2p from staging branch 2026-05-14 12:57:01 +03:00
ardocrat fde914d11a Merge branch 'staging' into rust_edition 2026-05-14 12:33:41 +03:00
ardocrat 473f9f33ad build: update rust edition 2021 2026-05-14 11:57:20 +03:00
ardocrat 0321bf5b14 fix: peers iterator to unblock blocked 2026-05-14 11:54:54 +03:00
ardocrat be37f46f59 fix: add segment to request if not exists 2026-05-14 11:26:22 +03:00
ardocrat 3a37d24dca Merge branch 'ci_include_git_dir' into grim 2026-05-02 23:18:32 +03:00
ardocrat 114c0eefa4 ci: include .git directory into build 2026-05-02 23:10:48 +03:00
ardocrat fb87a26d3f p2p: fix test for user agent 2026-05-02 21:14:36 +03:00
ardocrat f5d59e595e ci: ghcr build from grim branch 2026-05-02 20:56:16 +03:00
ardocrat 5bebacd605 node: include git hash into user agent 2026-05-02 20:55:32 +03:00
ardocrat 95486a4bc0 node: custom agent test 2026-05-01 12:44:45 +03:00
ardocrat e6265a810d Merge branch 'pibd_peers_fix' into grim
# Conflicts:
#	servers/src/grin/seed.rs
2026-05-01 12:36:43 +03:00
ardocrat 5fb9c5badf Merge branch 'pibd_fixes' into grim 2026-05-01 12:35:49 +03:00
ardocrat 0ea779e777 node: custom agent 2026-05-01 12:31:52 +03:00
ardocrat bd63c266e2 Merge branch 'lmdb_update' into grim 2026-05-01 01:31:18 +03:00
ardocrat f80e450896 Merge branch 'lmdb_update' into grim 2026-04-29 23:37:54 +03:00
ardocrat 1832e1e907 Merge branch 'lmdb_update' into grim 2026-04-29 22:26:48 +03:00
ardocrat 2164f6098f Merge branch 'lmdb_update' into grim 2026-04-27 22:28:14 +03:00
ardocrat 0bedfb39d7 Merge branch 'lmdb_update' into grim 2026-04-24 18:26:02 +03:00
ardocrat 13cfef93ec Merge branch 'lmdb_update' into grim 2026-04-24 18:14:55 +03:00
ardocrat 36211eef59 Merge branch 'lmdb_update' into grim 2026-04-24 13:46:12 +03:00
ardocrat 03ddb5f118 pibd: check if segment not exists at request when selecting next required 2026-04-23 00:52:16 +03:00
ardocrat 29f822f298 pibd: do not check for max cached segments on selecting next desired segment for request 2026-04-22 23:39:56 +03:00
ardocrat 3d14a54d69 Merge branch 'lmdb_update' into grim 2026-04-22 20:28:23 +03:00
ardocrat 769da6dbdd p2p: ignore last connection attempt when there is not enough outbound peers 2026-04-22 11:43:35 +03:00
ardocrat dfac42618c Merge branch 'pibd_peers_fix' into grim 2026-04-22 11:28:31 +03:00
ardocrat b41f0bdbd4 p2p: ignore last connection attempt when there is not enough outbound peers 2026-04-22 11:11:20 +03:00
ardocrat ab907508c7 Merge branch 'master' into pibd_peers_fix 2026-04-22 11:08:02 +03:00
ardocrat 3d8cb52995 Merge branch 'refs/heads/lmdb_update' into grim 2026-04-21 22:33:18 +03:00
ardocrat f47bf935ab Merge branch 'refs/heads/lmdb_update' into grim 2026-04-20 21:22:51 +03:00
ardocrat 0c3e18a913 Merge branch 'refs/heads/lmdb_update' into grim 2026-04-16 14:54:33 +03:00
ardocrat 42b928a42f Merge branch 'refs/heads/master' into grim
# Conflicts:
#	servers/src/grin/seed.rs
2026-04-09 20:50:44 +03:00
ardocrat cc555a22fb Merge branch 'refs/heads/peers_fix' into grim 2026-04-09 04:40:16 +03:00
ardocrat b172256d0b peer: update last_attempt when changing peer state to other than Banned 2026-04-09 04:22:02 +03:00
ardocrat 1258b844f3 p2p: do not check healthy and defunct peers more often than once per hour, store last connection attempt, do not ask for more peers when there is enough outbound 2026-04-09 04:15:19 +03:00
ardocrat 91f6ddc8c3 p2p: reduced amount of total peers to check at monitor 2026-04-08 22:11:11 +03:00
ardocrat 3787881673 Merge remote-tracking branch 'grin_ardocrat/peers_fix' into grim 2026-04-06 02:27:21 +03:00
ardocrat 9585c02135 p2p: cleanup before selection at monitor, add outbound to connected list only when there is not enough peers + disconnect extra peer immediately, reconnect to seeds at monitor to avoid stuck, update only defunct state to unknown when received existing peer address 2026-04-06 01:44:22 +03:00
ardocrat ab9715d9a9 peer: unknown state for new peers, check peers state on every monitor (128 healthy non-connected + 128 defuncts + 128 unknown), mark peer as defunct when ping not passed, do not crash on toml parse with dns failure 2026-04-04 01:58:35 +03:00
ardocrat 464babc7ef peer: block only non-blocked to not increase times for several requests 2026-03-30 23:09:09 +03:00
ardocrat 4511fd89a2 fix: add missing method clear_pibd_requests to commit 2026-03-30 15:34:18 +03:00
ardocrat d9bca1c776 pibd: choose peers based on minimal height, temporary block peers for stale segments disconnecting only outbound, force request for output and rangeproof segments to avoid stuck at this case 2026-03-30 15:26:29 +03:00
ardocrat 397b6cd116 pibd: fix check for next required kernel segment 2026-03-22 13:14:08 +03:00
29 changed files with 775 additions and 375 deletions
+1 -1
View File
@@ -2,7 +2,7 @@ name: Build and Push to GHCR
on:
push:
branches: [master, staging]
branches: [grim]
env:
GHCR_IMAGE: ghcr.io/${{ secrets.GHCR_USERNAME }}/${{ github.event.repository.name }}
Generated
+11 -11
View File
@@ -957,7 +957,7 @@ dependencies = [
[[package]]
name = "grin"
version = "5.4.0"
version = "5.4.1"
dependencies = [
"blake2-rfc",
"built",
@@ -987,7 +987,7 @@ dependencies = [
[[package]]
name = "grin_api"
version = "5.4.0"
version = "5.4.1"
dependencies = [
"async-stream",
"bytes 1.7.1",
@@ -1020,7 +1020,7 @@ dependencies = [
[[package]]
name = "grin_chain"
version = "5.4.0"
version = "5.4.1"
dependencies = [
"bit-vec",
"bitflags 1.3.2",
@@ -1044,7 +1044,7 @@ dependencies = [
[[package]]
name = "grin_config"
version = "5.4.0"
version = "5.4.1"
dependencies = [
"dirs",
"grin_core",
@@ -1060,7 +1060,7 @@ dependencies = [
[[package]]
name = "grin_core"
version = "5.4.0"
version = "5.4.1"
dependencies = [
"blake2-rfc",
"byteorder",
@@ -1086,7 +1086,7 @@ dependencies = [
[[package]]
name = "grin_keychain"
version = "5.4.0"
version = "5.4.1"
dependencies = [
"blake2-rfc",
"byteorder",
@@ -1107,7 +1107,7 @@ dependencies = [
[[package]]
name = "grin_p2p"
version = "5.4.0"
version = "5.4.1"
dependencies = [
"bitflags 1.3.2",
"built",
@@ -1130,7 +1130,7 @@ dependencies = [
[[package]]
name = "grin_pool"
version = "5.4.0"
version = "5.4.1"
dependencies = [
"blake2-rfc",
"chrono",
@@ -1162,7 +1162,7 @@ dependencies = [
[[package]]
name = "grin_servers"
version = "5.4.0"
version = "5.4.1"
dependencies = [
"async-stream",
"chrono",
@@ -1192,7 +1192,7 @@ dependencies = [
[[package]]
name = "grin_store"
version = "5.4.0"
version = "5.4.1"
dependencies = [
"byteorder",
"chrono",
@@ -1214,7 +1214,7 @@ dependencies = [
[[package]]
name = "grin_util"
version = "5.4.0"
version = "5.4.1"
dependencies = [
"anyhow",
"backtrace",
+12 -12
View File
@@ -1,6 +1,6 @@
[package]
name = "grin"
version = "5.4.0"
version = "5.4.1"
authors = ["Grin Developers <mimblewimble@lists.launchpad.net>"]
description = "Simple, private and scalable cryptocurrency implementation based on the Mimblewimble chain format."
license = "Apache-2.0"
@@ -34,15 +34,15 @@ serde_json = "1"
log = "0.4"
term = "0.6"
grin_api = { path = "./api", version = "5.4.0" }
grin_config = { path = "./config", version = "5.4.0" }
grin_chain = { path = "./chain", version = "5.4.0" }
grin_core = { path = "./core", version = "5.4.0" }
grin_keychain = { path = "./keychain", version = "5.4.0" }
grin_p2p = { path = "./p2p", version = "5.4.0" }
grin_servers = { path = "./servers", version = "5.4.0" }
grin_util = { path = "./util", version = "5.4.0" }
grin_store = { path = "./store", version = "5.4.0" }
grin_api = { path = "./api", version = "5.4.1" }
grin_config = { path = "./config", version = "5.4.1" }
grin_chain = { path = "./chain", version = "5.4.1" }
grin_core = { path = "./core", version = "5.4.1" }
grin_keychain = { path = "./keychain", version = "5.4.1" }
grin_p2p = { path = "./p2p", version = "5.4.1" }
grin_servers = { path = "./servers", version = "5.4.1" }
grin_util = { path = "./util", version = "5.4.1" }
grin_store = { path = "./store", version = "5.4.1" }
[dependencies.cursive]
version = "0.21"
@@ -53,5 +53,5 @@ features = ["pancurses-backend"]
built = { version = "0.8.0", features = ["git2"]}
[dev-dependencies]
grin_chain = { path = "./chain", version = "5.4.0" }
grin_store = { path = "./store", version = "5.4.0" }
grin_chain = { path = "./chain", version = "5.4.1" }
grin_store = { path = "./store", version = "5.4.1" }
+7 -7
View File
@@ -1,6 +1,6 @@
[package]
name = "grin_api"
version = "5.4.0"
version = "5.4.1"
authors = ["Grin Developers <mimblewimble@lists.launchpad.net>"]
description = "APIs for grin, a simple, private and scalable cryptocurrency implementation based on the Mimblewimble chain format."
license = "Apache-2.0"
@@ -32,9 +32,9 @@ async-stream = "0.3"
url = "2.1"
bytes = "1"
grin_core = { path = "../core", version = "5.4.0" }
grin_chain = { path = "../chain", version = "5.4.0" }
grin_p2p = { path = "../p2p", version = "5.4.0" }
grin_pool = { path = "../pool", version = "5.4.0" }
grin_store = { path = "../store", version = "5.4.0" }
grin_util = { path = "../util", version = "5.4.0" }
grin_core = { path = "../core", version = "5.4.1" }
grin_chain = { path = "../chain", version = "5.4.1" }
grin_p2p = { path = "../p2p", version = "5.4.1" }
grin_pool = { path = "../pool", version = "5.4.1" }
grin_store = { path = "../store", version = "5.4.1" }
grin_util = { path = "../util", version = "5.4.1" }
+5 -5
View File
@@ -1,6 +1,6 @@
[package]
name = "grin_chain"
version = "5.4.0"
version = "5.4.1"
authors = ["Grin Developers <mimblewimble@lists.launchpad.net>"]
description = "Chain implementation for grin, a simple, private and scalable cryptocurrency implementation based on the Mimblewimble chain format."
license = "Apache-2.0"
@@ -23,10 +23,10 @@ chrono = "0.4.11"
lru-cache = "0.1"
lazy_static = "1"
grin_core = { path = "../core", version = "5.4.0" }
grin_keychain = { path = "../keychain", version = "5.4.0" }
grin_store = { path = "../store", version = "5.4.0" }
grin_util = { path = "../util", version = "5.4.0" }
grin_core = { path = "../core", version = "5.4.1" }
grin_keychain = { path = "../keychain", version = "5.4.1" }
grin_store = { path = "../store", version = "5.4.1" }
grin_util = { path = "../util", version = "5.4.1" }
[dev-dependencies]
env_logger = "0.7"
+3
View File
@@ -43,6 +43,9 @@ pub const SEGMENT_REQUEST_TIMEOUT_SECS: i64 = 20;
/// will always be requested first)
pub const SEGMENT_REQUEST_COUNT: usize = 15;
/// How many blocks behind the tip a PIBD peer may be and still be considered usable.
pub const PIBD_PEER_HEIGHT_SLACK_BLOCKS: u64 = 2;
/// If the syncer hasn't seen a max work peer that supports PIBD in this number of seconds
/// give up and revert back to the txhashset.zip download method
pub const TXHASHSET_ZIP_FALLBACK_TIME_SECS: i64 = 60;
+115 -34
View File
@@ -280,6 +280,102 @@ pub struct BitmapSegment {
proof: SegmentProof,
}
impl BitmapSegment {
// Matches the upper end of the currently served PIBD bitmap segment range.
const MAX_SEGMENT_HEIGHT: u8 = 13;
fn max_chunks(identifier: &SegmentIdentifier) -> Result<usize, ser::Error> {
if identifier.height > Self::MAX_SEGMENT_HEIGHT {
return Err(ser::Error::TooLargeReadErr);
}
1usize
.checked_shl(identifier.height as u32)
.ok_or(ser::Error::TooLargeReadErr)
}
fn leaf_offset(identifier: &SegmentIdentifier) -> Result<u64, ser::Error> {
let segment_capacity = 1u64
.checked_shl(identifier.height as u32)
.ok_or(ser::Error::TooLargeReadErr)?;
segment_capacity
.checked_mul(identifier.idx)
.ok_or(ser::Error::TooLargeReadErr)
}
fn n_chunks(blocks: &[BitmapBlock]) -> Result<usize, ser::Error> {
let (last, full_blocks) = blocks.split_last().ok_or(ser::Error::CorruptedData)?;
for block in full_blocks {
if block.try_n_chunks()? != BitmapBlock::NCHUNKS {
return Err(ser::Error::CorruptedData);
}
}
let last_chunks = last.try_n_chunks()?;
if last_chunks == 0 {
return Err(ser::Error::CorruptedData);
}
full_blocks
.len()
.checked_mul(BitmapBlock::NCHUNKS)
.and_then(|n| n.checked_add(last_chunks))
.ok_or(ser::Error::TooLargeReadErr)
}
fn validate_blocks(
identifier: &SegmentIdentifier,
blocks: &[BitmapBlock],
) -> Result<usize, ser::Error> {
let offset = Self::leaf_offset(identifier)?;
let n_chunks = Self::n_chunks(blocks)?;
if n_chunks > Self::max_chunks(identifier)? {
return Err(ser::Error::TooLargeReadErr);
}
offset
.checked_add((n_chunks - 1) as u64)
.ok_or(ser::Error::TooLargeReadErr)?;
Ok(n_chunks)
}
/// Convert this bitmap segment into a PMMR segment, validating its encoded shape.
pub fn into_segment(self) -> Result<Segment<BitmapChunk>, ser::Error> {
let BitmapSegment {
identifier,
blocks,
proof,
} = self;
let n_chunks = Self::validate_blocks(&identifier, &blocks)?;
let mut leaf_pos = Vec::with_capacity(n_chunks);
let mut chunks = Vec::with_capacity(n_chunks);
let offset = Self::leaf_offset(&identifier)?;
for i in 0..(n_chunks as u64) {
let insertion_idx = offset.checked_add(i).ok_or(ser::Error::TooLargeReadErr)?;
leaf_pos.push(pmmr::insertion_to_pmmr_index(insertion_idx));
chunks.push(BitmapChunk::new());
}
for (block_idx, block) in blocks.into_iter().enumerate() {
block.try_n_chunks()?;
let offset = block_idx * BitmapBlock::NCHUNKS;
for (i, _) in block.inner.iter().enumerate().filter(|&(_, v)| v) {
chunks
.get_mut(offset + i / BitmapChunk::LEN_BITS)
.ok_or(ser::Error::CorruptedData)?
.0
.set(i % BitmapChunk::LEN_BITS, true);
}
}
Ok(Segment::from_parts(
identifier,
Vec::new(),
Vec::new(),
leaf_pos,
chunks,
proof,
))
}
}
impl Writeable for BitmapSegment {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
Writeable::write(&self.identifier, writer)?;
@@ -297,10 +393,20 @@ impl Readable for BitmapSegment {
let identifier: SegmentIdentifier = Readable::read(reader)?;
let n_blocks = reader.read_u16()? as usize;
if n_blocks == 0 {
return Err(ser::Error::CorruptedData);
}
let max_blocks = (BitmapSegment::max_chunks(&identifier)? + BitmapBlock::NCHUNKS - 1)
/ BitmapBlock::NCHUNKS;
if n_blocks > max_blocks {
return Err(ser::Error::TooLargeReadErr);
}
BitmapSegment::leaf_offset(&identifier)?;
let mut blocks = Vec::<BitmapBlock>::with_capacity(n_blocks);
for _ in 0..n_blocks {
blocks.push(Readable::read(reader)?);
}
BitmapSegment::validate_blocks(&identifier, &blocks)?;
let proof = Readable::read(reader)?;
Ok(Self {
@@ -348,36 +454,7 @@ impl From<Segment<BitmapChunk>> for BitmapSegment {
// TODO: this can be sped up with some `unsafe` code
impl From<BitmapSegment> for Segment<BitmapChunk> {
fn from(segment: BitmapSegment) -> Self {
let BitmapSegment {
identifier,
blocks,
proof,
} = segment;
// Count the number of chunks taking into account that the final block might be smaller
let n_chunks = (blocks.len() - 1) * BitmapBlock::NCHUNKS
+ blocks.last().map(|b| b.n_chunks()).unwrap_or(0);
let mut leaf_pos = Vec::with_capacity(n_chunks);
let mut chunks = Vec::with_capacity(n_chunks);
let offset = (1 << identifier.height) * identifier.idx;
for i in 0..(n_chunks as u64) {
leaf_pos.push(pmmr::insertion_to_pmmr_index(offset + i));
chunks.push(BitmapChunk::new());
}
for (block_idx, block) in blocks.into_iter().enumerate() {
assert!(block.inner.len() <= BitmapBlock::NBITS as usize);
let offset = block_idx * BitmapBlock::NCHUNKS;
for (i, _) in block.inner.iter().enumerate().filter(|&(_, v)| v) {
chunks
.get_mut(offset + i / BitmapChunk::LEN_BITS)
.unwrap()
.0
.set(i % BitmapChunk::LEN_BITS, true);
}
}
Segment::from_parts(identifier, Vec::new(), Vec::new(), leaf_pos, chunks, proof)
segment.into_segment().expect("valid bitmap segment")
}
}
@@ -401,12 +478,16 @@ impl BitmapBlock {
}
}
fn n_chunks(&self) -> usize {
fn try_n_chunks(&self) -> Result<usize, ser::Error> {
let length = self.inner.len();
assert_eq!(length % BitmapChunk::LEN_BITS, 0);
if length % BitmapChunk::LEN_BITS != 0 {
return Err(ser::Error::CorruptedData);
}
let n_chunks = length / BitmapChunk::LEN_BITS;
assert!(n_chunks <= BitmapBlock::NCHUNKS);
n_chunks
if n_chunks > BitmapBlock::NCHUNKS {
return Err(ser::Error::TooLargeReadErr);
}
Ok(n_chunks)
}
}
+118 -99
View File
@@ -497,102 +497,126 @@ impl Desegmenter {
}
}
}
} else {
// We have all required bitmap segments and have recreated our local
// bitmap, now continue with other segments, evenly spreading requests
// among MMRs
let local_output_mmr_size;
let local_kernel_mmr_size;
let local_rangeproof_mmr_size;
{
let txhashset = self.txhashset.read();
local_output_mmr_size = txhashset.output_mmr_size();
local_kernel_mmr_size = txhashset.kernel_mmr_size();
local_rangeproof_mmr_size = txhashset.rangeproof_mmr_size();
}
let total_output_segments = SegmentIdentifier::count_segments_required(
self.archive_header.output_mmr_size,
self.default_output_segment_height,
);
let mut elems_added = 0;
if let Some(mut next_output_idx) = self.next_required_output_segment_index() {
while (next_output_idx as usize) < total_output_segments {
if self.output_segment_cache.len() >= self.max_cached_segments {
break;
}
if elems_added == max_elements / 3 {
break;
}
let output_id = SegmentIdentifier {
height: self.default_output_segment_height,
idx: next_output_idx,
};
let (_first, last) =
output_id.segment_pos_range(self.archive_header.output_mmr_size);
if last > local_output_mmr_size && !self.has_output_segment_with_id(output_id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Output, output_id));
elems_added += 1;
}
next_output_idx += 1;
}
}
// Bitmap is not finished yet, continue at next iteration when it will be ready.
return return_vec;
}
let total_rangeproof_segments = SegmentIdentifier::count_segments_required(
self.archive_header.output_mmr_size,
self.default_rangeproof_segment_height,
);
elems_added = 0;
if let Some(mut next_rp_idx) = self.next_required_rangeproof_segment_index() {
while (next_rp_idx as usize) < total_rangeproof_segments {
if self.rangeproof_segment_cache.len() >= self.max_cached_segments {
break;
}
if elems_added == max_elements / 3 {
break;
}
let rp_id = SegmentIdentifier {
height: self.default_rangeproof_segment_height,
idx: next_rp_idx,
};
let (_first, last) =
rp_id.segment_pos_range(self.archive_header.output_mmr_size);
if last > local_rangeproof_mmr_size
&& !self.has_rangeproof_segment_with_id(rp_id)
{
return_vec.push(SegmentTypeIdentifier::new(SegmentType::RangeProof, rp_id));
elems_added += 1;
}
next_rp_idx += 1;
// We have all required bitmap segments and have recreated our local
// bitmap, now continue with other segments, evenly spreading requests
// among MMRs
let local_output_mmr_size;
let local_kernel_mmr_size;
let local_rangeproof_mmr_size;
{
let txhashset = self.txhashset.read();
local_output_mmr_size = txhashset.output_mmr_size();
local_kernel_mmr_size = txhashset.kernel_mmr_size();
local_rangeproof_mmr_size = txhashset.rangeproof_mmr_size();
}
let total_output_segments = SegmentIdentifier::count_segments_required(
self.archive_header.output_mmr_size,
self.default_output_segment_height,
);
let mut elems_added = 0;
if let Some(mut next_output_idx) = self.next_required_output_segment_index() {
while (next_output_idx as usize) < total_output_segments {
if elems_added == max_elements / 3 {
break;
}
}
let total_kernel_segments = SegmentIdentifier::count_segments_required(
self.archive_header.kernel_mmr_size,
self.default_kernel_segment_height,
);
elems_added = 0;
if let Some(mut next_kernel_idx) = self.next_required_kernel_segment_index() {
while (next_kernel_idx as usize) < total_kernel_segments {
if self.kernel_segment_cache.len() >= self.max_cached_segments {
break;
}
if elems_added == max_elements / 3 {
break;
}
let k_id = SegmentIdentifier {
height: self.default_kernel_segment_height,
idx: next_kernel_idx,
};
let (_first, last) =
k_id.segment_pos_range(self.archive_header.kernel_mmr_size);
if last > local_kernel_mmr_size && !self.has_kernel_segment_with_id(k_id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Kernel, k_id));
elems_added += 1;
}
next_kernel_idx += 1;
let output_id = SegmentIdentifier {
height: self.default_output_segment_height,
idx: next_output_idx,
};
let (_first, last) =
output_id.segment_pos_range(self.archive_header.output_mmr_size);
if last > local_output_mmr_size && !self.has_output_segment_with_id(output_id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Output, output_id));
elems_added += 1;
}
next_output_idx += 1;
}
}
let total_rangeproof_segments = SegmentIdentifier::count_segments_required(
self.archive_header.output_mmr_size,
self.default_rangeproof_segment_height,
);
elems_added = 0;
if let Some(mut next_rp_idx) = self.next_required_rangeproof_segment_index() {
while (next_rp_idx as usize) < total_rangeproof_segments {
if elems_added == max_elements / 3 {
break;
}
let rp_id = SegmentIdentifier {
height: self.default_rangeproof_segment_height,
idx: next_rp_idx,
};
let (_first, last) = rp_id.segment_pos_range(self.archive_header.output_mmr_size);
if last > local_rangeproof_mmr_size && !self.has_rangeproof_segment_with_id(rp_id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::RangeProof, rp_id));
elems_added += 1;
}
next_rp_idx += 1;
}
}
let total_kernel_segments = SegmentIdentifier::count_segments_required(
self.archive_header.kernel_mmr_size,
self.default_kernel_segment_height,
);
elems_added = 0;
if let Some(mut next_kernel_idx) = self.next_required_kernel_segment_index() {
while (next_kernel_idx as usize) < total_kernel_segments {
if elems_added == max_elements / 3 {
break;
}
let k_id = SegmentIdentifier {
height: self.default_kernel_segment_height,
idx: next_kernel_idx,
};
let (_first, last) = k_id.segment_pos_range(self.archive_header.kernel_mmr_size);
if last > local_kernel_mmr_size && !self.has_kernel_segment_with_id(k_id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Kernel, k_id));
elems_added += 1;
}
next_kernel_idx += 1;
}
}
// Explicitly add segment identifier to request if not exists.
let mut maybe_add_to_request = |seg_id: SegmentTypeIdentifier| {
if !return_vec.iter().any(|i| i == &seg_id) {
if return_vec.len() >= max_elements {
return_vec.pop();
}
return_vec.push(seg_id);
}
};
// Ensure we explicitly ask for the next output segment.
if let Some(next_output_idx) = self.next_required_output_segment_index() {
let seg_id = SegmentIdentifier {
height: self.default_output_segment_height,
idx: next_output_idx,
};
if !self.has_output_segment_with_id(seg_id) {
let next_output_seg_id = SegmentTypeIdentifier::new(SegmentType::Output, seg_id);
maybe_add_to_request(next_output_seg_id);
}
}
// Ensure we explicitly ask for the next rangeproof segment.
if let Some(next_rp_idx) = self.next_required_rangeproof_segment_index() {
let seg_id = SegmentIdentifier {
height: self.default_rangeproof_segment_height,
idx: next_rp_idx,
};
if !self.has_rangeproof_segment_with_id(seg_id) {
let next_proof_seg_id = SegmentTypeIdentifier::new(SegmentType::RangeProof, seg_id);
maybe_add_to_request(next_proof_seg_id);
}
}
// Always ensure we explicitly ask for the very next kernel segment we are waiting on.
// The regular round-robin above can get saturated with outputs and rangeproofs while
// the desegmenter is blocked on a missing kernel, so we force this one in.
@@ -601,14 +625,9 @@ impl Desegmenter {
height: self.default_kernel_segment_height,
idx: next_kernel_idx,
};
let next_kernel_seg_id = SegmentTypeIdentifier::new(SegmentType::Kernel, seg_id);
if !self.has_kernel_segment_with_id(seg_id)
&& !return_vec.iter().any(|x| x == &next_kernel_seg_id)
{
if return_vec.len() >= max_elements {
return_vec.pop();
}
return_vec.push(next_kernel_seg_id);
if !self.has_kernel_segment_with_id(seg_id) {
let next_kernel_seg_id = SegmentTypeIdentifier::new(SegmentType::Kernel, seg_id);
maybe_add_to_request(next_kernel_seg_id);
}
}
if return_vec.is_empty() && self.bitmap_cache.is_some() {
+10
View File
@@ -316,6 +316,16 @@ impl SyncState {
removed_segments
}
/// Drop all tracked PIBD requests, returning how many entries were removed.
pub fn clear_pibd_requests(&self) -> usize {
let mut requests = self.requested_pibd_segments.write();
let cleared = requests.len();
if cleared > 0 {
requests.clear();
}
cleared
}
/// Check whether segment is in request list
pub fn contains_pibd_segment(&self, id: &SegmentTypeIdentifier) -> bool {
self.requested_pibd_segments
+61 -2
View File
@@ -1,7 +1,7 @@
use self::chain::txhashset::{BitmapAccumulator, BitmapSegment};
use self::core::core::pmmr::segment::{Segment, SegmentIdentifier};
use self::core::ser::{
BinReader, BinWriter, DeserializationMode, ProtocolVersion, Readable, Writeable,
self, BinReader, BinWriter, DeserializationMode, ProtocolVersion, Readable, Writeable,
};
use croaring::Bitmap;
use grin_chain as chain;
@@ -10,6 +10,29 @@ use grin_util::secp::rand::Rng;
use rand::thread_rng;
use std::io::Cursor;
fn push_u16(bytes: &mut Vec<u8>, n: u16) {
bytes.extend_from_slice(&n.to_be_bytes());
}
fn push_u64(bytes: &mut Vec<u8>, n: u64) {
bytes.extend_from_slice(&n.to_be_bytes());
}
fn bitmap_segment_header(height: u8, idx: u64, n_blocks: u16) -> Vec<u8> {
let mut bytes = vec![height];
push_u64(&mut bytes, idx);
push_u16(&mut bytes, n_blocks);
bytes
}
fn read_bitmap_segment(bytes: &[u8]) -> Result<BitmapSegment, ser::Error> {
ser::deserialize(
&mut &bytes[..],
ProtocolVersion(1),
DeserializationMode::default(),
)
}
fn test_roundtrip(entries: usize) {
let mut rng = thread_rng();
@@ -63,7 +86,7 @@ fn test_roundtrip(entries: usize) {
assert_eq!(bms, bms2);
// Convert back to `Segment`
let segment2 = Segment::from(bms2);
let segment2 = bms2.into_segment().unwrap();
assert_eq!(segment, segment2);
}
@@ -83,3 +106,39 @@ fn abundant_segment_ser_roundtrip() {
let max = 1 << 16;
test_roundtrip(thread_rng().gen_range(max - 4096, max - 1024));
}
#[test]
fn bitmap_segment_read_rejects_empty_blocks() {
let bytes = bitmap_segment_header(9, 0, 0);
assert_eq!(
read_bitmap_segment(&bytes).err(),
Some(ser::Error::CorruptedData)
);
}
#[test]
fn bitmap_segment_read_rejects_too_many_blocks() {
let bytes = bitmap_segment_header(9, 0, 9);
assert_eq!(
read_bitmap_segment(&bytes).err(),
Some(ser::Error::TooLargeReadErr)
);
}
#[test]
fn bitmap_segment_read_rejects_too_large_height() {
let bytes = bitmap_segment_header(14, 0, 1);
assert_eq!(
read_bitmap_segment(&bytes).err(),
Some(ser::Error::TooLargeReadErr)
);
}
#[test]
fn bitmap_segment_read_rejects_offset_overflow() {
let bytes = bitmap_segment_header(13, u64::MAX, 1);
assert_eq!(
read_bitmap_segment(&bytes).err(),
Some(ser::Error::TooLargeReadErr)
);
}
+5 -5
View File
@@ -1,6 +1,6 @@
[package]
name = "grin_config"
version = "5.4.0"
version = "5.4.1"
authors = ["Grin Developers <mimblewimble@lists.launchpad.net>"]
description = "Configuration for grin, a simple, private and scalable cryptocurrency implementation based on the Mimblewimble chain format."
license = "Apache-2.0"
@@ -16,10 +16,10 @@ serde_derive = "1"
toml = "0.5"
dirs = "2.0"
grin_core = { path = "../core", version = "5.4.0" }
grin_servers = { path = "../servers", version = "5.4.0" }
grin_p2p = { path = "../p2p", version = "5.4.0" }
grin_util = { path = "../util", version = "5.4.0" }
grin_core = { path = "../core", version = "5.4.1" }
grin_servers = { path = "../servers", version = "5.4.1" }
grin_p2p = { path = "../p2p", version = "5.4.1" }
grin_util = { path = "../util", version = "5.4.1" }
[dev-dependencies]
pretty_assertions = "0.6.1"
+3 -3
View File
@@ -1,6 +1,6 @@
[package]
name = "grin_core"
version = "5.4.0"
version = "5.4.1"
authors = ["Grin Developers <mimblewimble@lists.launchpad.net>"]
description = "Chain implementation for grin, a simple, private and scalable cryptocurrency implementation based on the Mimblewimble chain format."
license = "Apache-2.0"
@@ -28,8 +28,8 @@ chrono = { version = "0.4.11", features = ["serde"] }
zeroize = { version = "1.1", features =["zeroize_derive"] }
bytes = "0.5"
keychain = { package = "grin_keychain", path = "../keychain", version = "5.4.0" }
util = { package = "grin_util", path = "../util", version = "5.4.0" }
keychain = { package = "grin_keychain", path = "../keychain", version = "5.4.1" }
util = { package = "grin_util", path = "../util", version = "5.4.1" }
[dev-dependencies]
serde_json = "1"
+41 -38
View File
@@ -21,6 +21,39 @@ use croaring::Bitmap;
use std::cmp::min;
use std::fmt::Debug;
const MAX_SEGMENT_READ_ITEMS: u64 = 1_000_000;
const SEGMENT_READ_PREALLOC_ITEMS: u64 = 1024;
fn read_segment_item_count<R: Reader>(reader: &mut R) -> Result<u64, Error> {
let count = reader.read_u64()?;
if count > MAX_SEGMENT_READ_ITEMS {
return Err(Error::TooLargeReadErr);
}
Ok(count)
}
fn read_segment_positions<R: Reader>(reader: &mut R, count: u64) -> Result<Vec<u64>, Error> {
let mut positions = Vec::with_capacity(min(count, SEGMENT_READ_PREALLOC_ITEMS) as usize);
let mut last_pos = 0;
for _ in 0..count {
let pos = reader.read_u64()?;
if pos <= last_pos {
return Err(Error::SortError);
}
last_pos = pos;
positions.push(pos - 1);
}
Ok(positions)
}
fn read_segment_items<T: Readable, R: Reader>(reader: &mut R, count: u64) -> Result<Vec<T>, Error> {
let mut items = Vec::with_capacity(min(count, SEGMENT_READ_PREALLOC_ITEMS) as usize);
for _ in 0..count {
items.push(T::read(reader)?);
}
Ok(items)
}
#[derive(Clone, Debug, Eq, PartialEq)]
/// Possible segment types, according to this desegmenter
pub enum SegmentType {
@@ -568,39 +601,13 @@ impl<T: Readable> Readable for Segment<T> {
fn read<R: Reader>(reader: &mut R) -> Result<Self, Error> {
let identifier = Readable::read(reader)?;
let n_hashes = reader.read_u64()? as usize;
let mut hash_pos = Vec::with_capacity(n_hashes);
let mut last_pos = 0;
for _ in 0..n_hashes {
let pos = reader.read_u64()?;
if pos <= last_pos {
return Err(Error::SortError);
}
last_pos = pos;
hash_pos.push(pos - 1);
}
let n_hashes = read_segment_item_count(reader)?;
let hash_pos = read_segment_positions(reader, n_hashes)?;
let hashes = read_segment_items(reader, n_hashes)?;
let mut hashes = Vec::<Hash>::with_capacity(n_hashes);
for _ in 0..n_hashes {
hashes.push(Readable::read(reader)?);
}
let n_leaves = reader.read_u64()? as usize;
let mut leaf_pos = Vec::with_capacity(n_leaves);
last_pos = 0;
for _ in 0..n_leaves {
let pos = reader.read_u64()?;
if pos <= last_pos {
return Err(Error::SortError);
}
last_pos = pos;
leaf_pos.push(pos - 1);
}
let mut leaf_data = Vec::<T>::with_capacity(n_leaves);
for _ in 0..n_leaves {
leaf_data.push(Readable::read(reader)?);
}
let n_leaves = read_segment_item_count(reader)?;
let leaf_pos = read_segment_positions(reader, n_leaves)?;
let leaf_data = read_segment_items(reader, n_leaves)?;
let proof = Readable::read(reader)?;
@@ -823,12 +830,8 @@ impl SegmentProof {
impl Readable for SegmentProof {
fn read<R: Reader>(reader: &mut R) -> Result<Self, Error> {
let n_hashes = reader.read_u64()? as usize;
let mut hashes = Vec::with_capacity(n_hashes);
for _ in 0..n_hashes {
let hash: Hash = Readable::read(reader)?;
hashes.push(hash);
}
let n_hashes = read_segment_item_count(reader)?;
let hashes = read_segment_items(reader, n_hashes)?;
Ok(Self { hashes })
}
}
+32
View File
@@ -16,10 +16,15 @@ mod common;
use self::core::core::pmmr;
use self::core::core::{Segment, SegmentIdentifier};
use self::core::ser::{self, DeserializationMode, ProtocolVersion};
use common::TestElem;
use grin_core as core;
use grin_core::core::pmmr::ReadablePMMR;
fn push_u64(bytes: &mut Vec<u8>, n: u64) {
bytes.extend_from_slice(&n.to_be_bytes());
}
fn test_unprunable_size(height: u8, n_leaves: u32) {
let size = 1u64 << height;
let n_segments = (n_leaves as u64 + size - 1) / size;
@@ -59,3 +64,30 @@ fn unprunable_mmr() {
test_unprunable_size(3, i);
}
}
#[test]
fn segment_read_rejects_large_hash_count() {
let mut bytes = vec![1];
push_u64(&mut bytes, 0);
push_u64(&mut bytes, 1_000_001);
let res: Result<Segment<TestElem>, _> = ser::deserialize(
&mut &bytes[..],
ProtocolVersion(1),
DeserializationMode::default(),
);
assert_eq!(res.err(), Some(ser::Error::TooLargeReadErr));
}
#[test]
fn segment_proof_read_rejects_large_hash_count() {
let mut bytes = vec![];
push_u64(&mut bytes, 1_000_001);
let res: Result<self::core::core::SegmentProof, _> = ser::deserialize(
&mut &bytes[..],
ProtocolVersion(1),
DeserializationMode::default(),
);
assert_eq!(res.err(), Some(ser::Error::TooLargeReadErr));
}
+2 -2
View File
@@ -1,6 +1,6 @@
[package]
name = "grin_keychain"
version = "5.4.0"
version = "5.4.1"
authors = ["Grin Developers <mimblewimble@lists.launchpad.net>"]
description = "Chain implementation for grin, a simple, private and scalable cryptocurrency implementation based on the Mimblewimble chain format."
license = "Apache-2.0"
@@ -26,4 +26,4 @@ ripemd160 = "0.9"
sha2 = "0.9"
pbkdf2 = "0.8"
grin_util = { path = "../util", version = "5.4.0" }
grin_util = { path = "../util", version = "5.4.1" }
+6 -6
View File
@@ -1,6 +1,6 @@
[package]
name = "grin_p2p"
version = "5.4.0"
version = "5.4.1"
authors = ["Grin Developers <mimblewimble@lists.launchpad.net>"]
description = "Chain implementation for grin, a simple, private and scalable cryptocurrency implementation based on the Mimblewimble chain format."
license = "Apache-2.0"
@@ -23,13 +23,13 @@ log = "0.4"
chrono = { version = "0.4.11", features = ["serde"] }
bytes = "0.5"
grin_core = { path = "../core", version = "5.4.0" }
grin_store = { path = "../store", version = "5.4.0" }
grin_util = { path = "../util", version = "5.4.0" }
grin_chain = { path = "../chain", version = "5.4.0" }
grin_core = { path = "../core", version = "5.4.1" }
grin_store = { path = "../store", version = "5.4.1" }
grin_util = { path = "../util", version = "5.4.1" }
grin_chain = { path = "../chain", version = "5.4.1" }
[dev-dependencies]
grin_pool = { path = "../pool", version = "5.4.0" }
grin_pool = { path = "../pool", version = "5.4.1" }
[build-dependencies]
built = { version = "0.8.0", features = ["git2"]}
+2 -2
View File
@@ -48,9 +48,9 @@ pub mod built_info {
/// Grin's user agent with current version
pub fn user_agent() -> String {
format!(
"MW/Grin {}{}",
"MW/Grim {}{}",
env!("CARGO_PKG_VERSION"),
built_info::GIT_COMMIT_HASH_SHORT.map_or_else(|| "".to_owned(), |v| ".".to_owned() + v)
built_info::GIT_COMMIT_HASH_SHORT.map_or_else(|| "+".to_owned(), |v| ".".to_owned() + v)
)
}
+1 -2
View File
@@ -13,6 +13,7 @@
// limitations under the License.
use crate::util::{Mutex, RwLock};
use lru_cache::LruCache;
use std::fmt;
use std::fs::File;
use std::net::{Shutdown, TcpStream};
@@ -20,8 +21,6 @@ use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use lru_cache::LruCache;
use crate::chain;
use crate::chain::txhashset::BitmapChunk;
use crate::conn;
+80
View File
@@ -45,6 +45,7 @@ pub struct Peers {
pub adapter: Arc<dyn ChainAdapter>,
store: PeerStore,
peers: RwLock<HashMap<PeerAddr, Arc<Peer>>>,
blocked: RwLock<HashMap<PeerAddr, (DateTime<Utc>, u32)>>,
config: P2PConfig,
}
@@ -55,6 +56,7 @@ impl Peers {
store,
config,
peers: RwLock::new(HashMap::new()),
blocked: RwLock::new(HashMap::new()),
}
}
@@ -445,6 +447,74 @@ impl Peers {
}
}
/// Disconnect a peer without banning it.
pub fn disconnect_peer(&self, peer_addr: PeerAddr, reason: &str) -> Result<(), Error> {
let mut peers = self.peers.try_write_for(LOCK_TIMEOUT).ok_or_else(|| {
error!("disconnect_peer: failed to get peers lock");
Error::PeerException
})?;
match peers.remove(&peer_addr) {
Some(peer) => {
warn!("disconnecting peer {} ({})", peer_addr, reason);
peer.stop();
Ok(())
}
None => Err(Error::PeerNotFound),
}
}
/// Whether this peer has been blocked.
pub fn is_blocked(&self, peer_addr: PeerAddr) -> bool {
match self.blocked.try_read_for(LOCK_TIMEOUT) {
Some(peers) => match peers.get(&peer_addr) {
None => false,
Some((expiry, _)) => expiry > &Utc::now(),
},
None => {
error!("is_blocked: failed to get peers lock");
false
}
}
}
/// Temporary block a peer without banning it.
pub fn block_peer(&self, peer_addr: PeerAddr, reason: &str) -> Result<(), Error> {
let mut blocked = self.blocked.try_write_for(LOCK_TIMEOUT).ok_or_else(|| {
error!("block_peer: failed to get blocked lock");
Error::PeerException
})?;
let times = {
match blocked.get(&peer_addr) {
Some((_, times)) => times + 1,
None => 1,
}
};
let duration = match times {
1 => 60, // 1m
2 => 180, // 3m
_ => 600, // 10m
};
let expiry = Utc::now() + Duration::seconds(duration);
blocked.insert(peer_addr, (expiry, times));
warn!(
"state_sync: block peer {} ({}) for {} times: {}",
peer_addr, reason, duration, times
);
Ok(())
}
/// Unblock all blocked peers.
pub fn unblock_peers(&self) -> Result<(), Error> {
let mut blocked = self.blocked.try_write_for(LOCK_TIMEOUT).ok_or_else(|| {
error!("unblock_peers: failed to get blocked lock");
Error::PeerException
})?;
blocked.clear();
Ok(())
}
/// We have enough outbound connected peers
pub fn enough_outbound_peers(&self) -> bool {
self.iter().outbound().connected().count()
@@ -832,6 +902,16 @@ impl<I: Iterator<Item = Arc<Peer>>> PeersIter<I> {
}
}
/// Custom filter.
pub fn with_filter(
self,
f: impl Fn(&Arc<Peer>) -> bool,
) -> PeersIter<impl Iterator<Item = Arc<Peer>>> {
PeersIter {
iter: self.iter.filter(move |p| f(p)),
}
}
pub fn by_addr(&mut self, addr: PeerAddr) -> Option<Arc<Peer>> {
self.iter.find(|p| p.info.addr == addr)
}
+1 -1
View File
@@ -382,7 +382,7 @@ impl MessageHandler for Protocol {
block_hash,
output_root
);
adapter.receive_bitmap_segment(block_hash, output_root, segment.into())?;
adapter.receive_bitmap_segment(block_hash, output_root, segment.into_segment()?)?;
Consumed::None
}
Message::OutputSegment(req) => {
+8 -1
View File
@@ -18,6 +18,7 @@ use grin_p2p as p2p;
use grin_util as util;
use grin_util::StopState;
use std::fs;
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::Arc;
use std::{thread, time};
@@ -44,11 +45,17 @@ fn test_setup() {
util::init_test_logger();
}
fn clean_output_dir(dir_name: &str) {
let _ = fs::remove_dir_all(dir_name);
}
// Starts a server and connects a client peer to it to check handshake,
// followed by a ping/pong exchange to make sure the connection is live.
#[test]
fn peer_handshake() {
test_setup();
let test_dir = "target/peer_handshake";
clean_output_dir(test_dir);
let p2p_config = p2p::P2PConfig {
host: "127.0.0.1".parse().unwrap(),
@@ -60,7 +67,7 @@ fn peer_handshake() {
let net_adapter = Arc::new(p2p::DummyAdapter {});
let server = Arc::new(
p2p::Server::new(
".grin",
test_dir,
p2p::Capabilities::UNKNOWN,
p2p_config.clone(),
net_adapter.clone(),
+5 -5
View File
@@ -1,6 +1,6 @@
[package]
name = "grin_pool"
version = "5.4.0"
version = "5.4.1"
authors = ["Grin Developers <mimblewimble@lists.launchpad.net>"]
description = "Chain implementation for grin, a simple, private and scalable cryptocurrency implementation based on the Mimblewimble chain format."
license = "Apache-2.0"
@@ -18,9 +18,9 @@ thiserror = "1"
log = "0.4"
chrono = "0.4.11"
grin_core = { path = "../core", version = "5.4.0" }
grin_keychain = { path = "../keychain", version = "5.4.0" }
grin_util = { path = "../util", version = "5.4.0" }
grin_core = { path = "../core", version = "5.4.1" }
grin_keychain = { path = "../keychain", version = "5.4.1" }
grin_util = { path = "../util", version = "5.4.1" }
[dev-dependencies]
grin_chain = { path = "../chain", version = "5.4.0" }
grin_chain = { path = "../chain", version = "5.4.1" }
+9 -9
View File
@@ -1,6 +1,6 @@
[package]
name = "grin_servers"
version = "5.4.0"
version = "5.4.1"
authors = ["Grin Developers <mimblewimble@lists.launchpad.net>"]
description = "Simple, private and scalable cryptocurrency implementation based on the Mimblewimble chain format."
license = "Apache-2.0"
@@ -27,11 +27,11 @@ async-stream = "0.3"
rustls = "0.20"
walkdir = "2.3.1"
grin_api = { path = "../api", version = "5.4.0" }
grin_chain = { path = "../chain", version = "5.4.0" }
grin_core = { path = "../core", version = "5.4.0" }
grin_keychain = { path = "../keychain", version = "5.4.0" }
grin_p2p = { path = "../p2p", version = "5.4.0" }
grin_pool = { path = "../pool", version = "5.4.0" }
grin_store = { path = "../store", version = "5.4.0" }
grin_util = { path = "../util", version = "5.4.0" }
grin_api = { path = "../api", version = "5.4.1" }
grin_chain = { path = "../chain", version = "5.4.1" }
grin_core = { path = "../core", version = "5.4.1" }
grin_keychain = { path = "../keychain", version = "5.4.1" }
grin_p2p = { path = "../p2p", version = "5.4.1" }
grin_pool = { path = "../pool", version = "5.4.1" }
grin_store = { path = "../store", version = "5.4.1" }
grin_util = { path = "../util", version = "5.4.1" }
+7 -3
View File
@@ -194,7 +194,8 @@ fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sende
return;
}
if !peers.enough_outbound_peers() {
let enough_outbound = peers.enough_outbound_peers();
if !enough_outbound {
// loop over connected peers that can provide peer lists
// ask them for their list of peers
let mut connected_peers: Vec<PeerAddr> = vec![];
@@ -239,7 +240,8 @@ fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sende
.iter()
.filter(|p| {
peers.get_connected_peer(p.addr).is_none()
&& Utc::now().timestamp() - p.last_attempt >= max_attempt_delay
&& (!enough_outbound
|| Utc::now().timestamp() - p.last_attempt >= max_attempt_delay)
})
.choose_multiple(&mut thread_rng(), max_peer_attempts / 2)
{
@@ -268,7 +270,9 @@ fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sende
// check min 32 (max 128, if there are no healthy and unknown) random defunct peers no more often than 1 hour per peer.
for dp in defuncts
.iter()
.filter(|p| Utc::now().timestamp() - p.last_attempt >= max_attempt_delay)
.filter(|p| {
!enough_outbound || Utc::now().timestamp() - p.last_attempt >= max_attempt_delay
})
.choose_multiple(&mut thread_rng(), max_peer_attempts - new_peers.len())
{
new_peers.push(&dp.addr);
+119 -43
View File
@@ -14,6 +14,7 @@
use chrono::prelude::{DateTime, Utc};
use chrono::Duration;
use grin_p2p::PeerAddr;
use std::sync::Arc;
use crate::chain::{self, pibd_params, SyncState, SyncStatus};
@@ -256,6 +257,46 @@ impl StateSync {
.sync_state
.remove_stale_pibd_requests(pibd_params::SEGMENT_REQUEST_TIMEOUT_SECS);
if !stale_segments.is_empty() {
for (seg_id, peer_addr) in stale_segments.iter() {
if let Some(peer_addr) = peer_addr {
let _ = self
.peers
.block_peer(PeerAddr(*peer_addr), "PIBD segment timeout");
debug!(
"state_sync: peer {} moved to PIBD retry exclusion list for segment {:?}",
peer_addr, seg_id
);
let is_outbound = {
self.peers
.iter()
.outbound()
.by_addr(PeerAddr(peer_addr.clone()))
.is_some()
};
if is_outbound {
debug!("state_sync: disconnecting peer {}", peer_addr);
if let Err(e) = self
.peers
.disconnect_peer(PeerAddr(*peer_addr), "PIBD segment timeout")
{
debug!(
"state_sync: failed to disconnect timed-out peer {}: {:?}",
peer_addr, e
);
}
} else {
debug!("state_sync: peer {} is not outbound or not connected, do not disconnect", peer_addr);
}
} else {
debug!(
"state_sync: PIBD request {:?} timed out without a recorded peer",
seg_id
);
}
}
}
// Apply segments... TODO: figure out how this should be called, might
// need to be a separate thread.
if let Some(mut de) = desegmenter.try_write() {
@@ -318,11 +359,28 @@ impl StateSync {
.connected()
};
// Get peers with reasonable height for pibd.
let height_slack = pibd_params::PIBD_PEER_HEIGHT_SLACK_BLOCKS;
let max_pibd_height = peers_iter_pibd()
.into_iter()
.map(|p| p.info.height())
.max()
.unwrap_or(0);
let available_pibd_peers = || {
peers_iter_pibd().with_filter(|p| {
p.info.height().saturating_add(height_slack) >= max_pibd_height
})
};
// If there are no suitable PIBD-Enabled peers, AND there hasn't been one for a minute,
// abort PIBD and fall back to txhashset download
// Waiting a minute helps ensures that the cancellation isn't simply due to a single non-PIBD enabled
// peer having the max difficulty
if peers_iter_pibd().count() == 0 {
if available_pibd_peers()
.with_filter(|p| !peers.is_blocked(p.info.addr))
.count() == 0
{
if let None = self.earliest_zero_pibd_peer_time {
self.set_earliest_zero_pibd_peer_time(Some(Utc::now()));
}
@@ -339,72 +397,89 @@ impl StateSync {
self.set_pibd_aborted();
return false;
}
} else {
self.set_earliest_zero_pibd_peer_time(None)
let cleared = self.sync_state.clear_pibd_requests();
if cleared > 0 {
warn!(
"state_sync: cleared {} pending PIBD requests because no PIBD-enabled peers are currently available",
cleared
);
}
continue;
}
self.set_earliest_zero_pibd_peer_time(None);
// Choose a random "most work" peer, excluding peer from stale segment and preferring outbound if at all possible.
let excluded_peer = stale_segments
.iter()
.find(|(stale_id, _)| stale_id == seg_id)
.and_then(|(_, addr)| *addr);
let peer = peers_iter_pibd()
let peer = available_pibd_peers()
.outbound()
.with_filter(|p| !peers.is_blocked(p.info.addr))
.exclude(excluded_peer)
.choose_random()
.or_else(|| {
peers_iter_pibd()
available_pibd_peers()
.inbound()
.with_filter(|p| !peers.is_blocked(p.info.addr))
.exclude(excluded_peer)
.choose_random()
});
trace!("Chosen peer is {:?}", peer);
if let Some(p) = peer {
// add to list of segments that are being tracked
self.sync_state.add_pibd_segment(seg_id, p.info.addr.0);
let res = match seg_id.segment_type {
SegmentType::Bitmap => p.send_bitmap_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
),
SegmentType::Output => p.send_output_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
),
SegmentType::RangeProof => p.send_rangeproof_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
),
SegmentType::Kernel => p.send_kernel_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
),
};
if let Err(e) = res {
info!(
"Error sending request to peer at {}, reason: {:?}",
p.info.addr, e
let p = match peer {
Some(p) => p,
None => {
debug!(
"state_sync: no eligible PIBD peers available for request {:?}",
seg_id
);
continue;
}
};
// add to list of segments that are being tracked
self.sync_state.add_pibd_segment(seg_id, p.info.addr.0);
let res = match seg_id.segment_type {
SegmentType::Bitmap => {
p.send_bitmap_segment_request(archive_header.hash(), seg_id.identifier.clone())
}
SegmentType::Output => {
p.send_output_segment_request(archive_header.hash(), seg_id.identifier.clone())
}
SegmentType::RangeProof => p.send_rangeproof_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
),
SegmentType::Kernel => {
p.send_kernel_segment_request(archive_header.hash(), seg_id.identifier.clone())
}
};
if let Err(e) = res {
info!(
"Error sending request to peer at {}, reason: {:?}",
p.info.addr, e
);
self.sync_state.remove_pibd_segment(seg_id);
} else if let Some(prev_peer) = excluded_peer {
if p.info.addr.0 != prev_peer {
info!(
"state_sync: retrying segment {:?} with new peer {} (previously {})",
seg_id, p.info.addr, prev_peer
);
self.sync_state.remove_pibd_segment(seg_id);
} else if let Some(prev_peer) = excluded_peer {
if p.info.addr.0 != prev_peer {
info!(
"state_sync: retrying segment {:?} with new peer {} (previously {})",
seg_id, p.info.addr, prev_peer
);
} else {
debug!(
"state_sync: requested segment {:?} from peer {}",
seg_id, p.info.addr
);
}
} else {
debug!(
"state_sync: requested segment {:?} from peer {}",
seg_id, p.info.addr
);
}
} else {
debug!(
"state_sync: requested segment {:?} from peer {}",
seg_id, p.info.addr
);
}
}
false
@@ -495,6 +570,7 @@ impl StateSync {
}
fn state_sync_reset(&mut self) {
let _ = self.peers.unblock_peers();
self.prev_state_sync = None;
self.state_sync_peer = None;
}
+3 -3
View File
@@ -1,6 +1,6 @@
[package]
name = "grin_store"
version = "5.4.0"
version = "5.4.1"
authors = ["Grin Developers <mimblewimble@lists.launchpad.net>"]
description = "Simple, private and scalable cryptocurrency implementation based on the Mimblewimble chain format."
license = "Apache-2.0"
@@ -21,8 +21,8 @@ serde_derive = "1"
thiserror = "1"
log = "0.4"
grin_core = { path = "../core", version = "5.4.0" }
grin_util = { path = "../util", version = "5.4.0" }
grin_core = { path = "../core", version = "5.4.1" }
grin_util = { path = "../util", version = "5.4.1" }
[dev-dependencies]
chrono = "0.4.11"
+81 -80
View File
@@ -228,25 +228,28 @@ impl Store {
}
// Database setup.
let r_env_map = env_map.read();
let env = r_env_map.get(&full_path).unwrap().env.clone();
let mut write = env.write_txn()?;
let def_name = db_name.unwrap_or(DEFAULT_ENV_NAME);
let def_db = env.create_database(&mut write, Some(def_name))?;
let mut dbs_map = HashMap::<u8, Database<Bytes, Bytes>>::new();
for p in prefixes {
let db = env.create_database(&mut write, Some(p.to_string().as_str()))?;
dbs_map.insert(p, db);
}
write.commit()?;
let s = {
let r_env_map = env_map.read();
let env = r_env_map.get(&full_path).unwrap().env.clone();
let mut write = env.write_txn()?;
let def_name = db_name.unwrap_or(DEFAULT_ENV_NAME);
let def_db = env.create_database(&mut write, Some(def_name))?;
let mut dbs_map = HashMap::<u8, Database<Bytes, Bytes>>::new();
for p in prefixes {
let db = env.create_database(&mut write, Some(p.to_string().as_str()))?;
dbs_map.insert(p, db);
}
write.commit()?;
let s = Store {
env: env.clone(),
env_path: full_path.clone(),
pre_dbs: Arc::new(dbs_map),
def_db,
version: DEFAULT_DB_VERSION,
alloc_chunk_size,
let s = Store {
env: env.clone(),
env_path: full_path.clone(),
pre_dbs: Arc::new(dbs_map),
def_db,
version: DEFAULT_DB_VERSION,
alloc_chunk_size,
};
s
};
// Migrate to default environment if needed.
@@ -632,12 +635,9 @@ impl Store {
Ok(read) => {
let db_res = self.get_db(db_key);
match db_res {
Ok(db) => Ok(DatabaseIterator::new(
self,
Arc::new(db.clone()),
read,
deserialize,
)),
Ok(db) => {
DatabaseIterator::new(self, Arc::new(db.clone()), read, deserialize)
}
Err(e) => Err(Error::from(e)),
}
}
@@ -794,12 +794,12 @@ impl<'a> Batch<'a> {
Ok(read) => {
let db_res = self.store.get_db(db_key);
match db_res {
Ok(db) => Ok(DatabaseIterator::new(
Ok(db) => DatabaseIterator::new(
self.store,
Arc::new(db.clone()),
read,
deserialize,
)),
),
Err(e) => Err(Error::from(e)),
}
}
@@ -876,9 +876,9 @@ where
db: Arc<Database<Bytes, Bytes>>,
read: Arc<RoTxn<'a, WithoutTls>>,
keys: Vec<Vec<u8>>,
total_keys: usize,
skip_cur: usize,
skip_total: usize,
done: bool,
deserialize: F,
#[allow(dead_code)]
tx_counter: TxCounter,
@@ -891,44 +891,37 @@ where
type Item = Result<T, Error>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(k) = self.keys.iter().skip(self.skip_cur).next() {
self.skip_total += 1;
self.skip_cur += 1;
match self.db.get(&self.read, k) {
Ok(v) => {
if let Some(v) = v {
return match (self.deserialize)(k, v) {
Ok(v) => Some(Ok(v)),
Err(e) => {
error!("db iter: error deserializing: {}", e);
Some(Err(Error::from(e)))
}
};
}
}
Err(e) => {
return {
error!("db iter: error read value: {}", e);
Some(Err(Error::from(e)))
loop {
if self.done {
return None;
} else if let Some(k) = self.keys.iter().skip(self.skip_cur).next() {
self.skip_total += 1;
self.skip_cur += 1;
match self.db.get(&self.read, k) {
Ok(v) => {
if let Some(v) = v {
return match (self.deserialize)(k, v) {
Ok(v) => Some(Ok(v)),
Err(e) => {
error!("db iter: error deserializing: {}", e);
Some(Err(Error::from(e)))
}
};
}
}
Err(e) => {
return {
error!("db iter: error read value: {}", e);
Some(Err(Error::from(e)))
}
}
}
} else if let Err(e) = self.load_next_keys() {
error!("db iter: error read keys: {}", e);
self.done = true;
return Some(Err(e));
}
} else if self.total_keys > self.skip_total {
let keys = if let Ok(iter) = self.db.iter(&self.read) {
iter.move_between_keys()
.skip(self.skip_total)
.take(10000)
.filter(|kv| kv.is_ok())
.map(|kv| kv.unwrap().0.to_vec())
.collect::<Vec<Vec<u8>>>()
} else {
vec![]
};
self.skip_cur = 0;
self.keys = keys;
return self.next();
}
None
}
}
@@ -942,34 +935,42 @@ where
db: Arc<Database<Bytes, Bytes>>,
read: RoTxn<'a, WithoutTls>,
deserialize: F,
) -> DatabaseIterator<'a, F, T> {
let (keys, total_keys) = if let Ok(iter) = db.iter(&read) {
let total = iter.move_between_keys().count();
let keys = if let Ok(iter) = db.iter(&read) {
iter.move_between_keys()
.take(10000)
.filter(|kv| kv.is_ok())
.map(|kv| kv.unwrap().0.to_vec())
.collect::<Vec<Vec<u8>>>()
} else {
vec![]
};
(keys, total)
} else {
(vec![], 0)
};
DatabaseIterator {
) -> Result<DatabaseIterator<'a, F, T>, Error> {
// load keys before constructing tx_counter to avoid double-decrementing open_txs_count on error
let keys = Self::read_key_page(&db, &read, 0)?;
let done = keys.is_empty();
Ok(DatabaseIterator {
db,
read: Arc::new(read),
keys,
total_keys,
skip_cur: 0,
skip_total: 0,
done,
deserialize,
tx_counter: TxCounter {
env_path: store.env_path.clone(),
},
}
})
}
fn load_next_keys(&mut self) -> Result<(), Error> {
self.keys = Self::read_key_page(&self.db, &self.read, self.skip_total)?;
self.skip_cur = 0;
self.done = self.keys.is_empty();
Ok(())
}
fn read_key_page(
db: &Database<Bytes, Bytes>,
read: &RoTxn<'a, WithoutTls>,
skip: usize,
) -> Result<Vec<Vec<u8>>, Error> {
let iter = db.iter(read)?;
iter.move_between_keys()
.skip(skip)
.take(10000)
.map(|kv| kv.map(|(k, _)| k.to_vec()).map_err(Error::from))
.collect::<Result<Vec<Vec<u8>>, Error>>()
}
}
+26
View File
@@ -140,6 +140,32 @@ fn test_iter() -> Result<(), store::Error> {
Ok(())
}
#[test]
fn test_iter_pages() -> Result<(), store::Error> {
let test_dir = "target/test_iter_pages";
setup(test_dir);
let prefix = b'P';
let store = store::Store::new(test_dir, Some("test1"), None, vec![prefix], None, None)?;
{
let mut batch = store.batch()?;
for i in 0..10_001u32 {
batch.put(Some(prefix), &i.to_be_bytes(), &[1])?;
}
batch.commit()?;
}
let count = store
.iter(Some(prefix), |_, v| Ok(v.to_vec()))?
.collect::<Result<Vec<_>, _>>()?
.len();
assert_eq!(count, 10_001);
clean_output_dir(test_dir);
Ok(())
}
#[test]
fn lmdb_allocate() -> Result<(), store::Error> {
let test_dir = "target/lmdb_allocate";
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "grin_util"
version = "5.4.0"
version = "5.4.1"
authors = ["Grin Developers <mimblewimble@lists.launchpad.net>"]
description = "Simple, private and scalable cryptocurrency implementation based on the Mimblewimble chain format."
license = "Apache-2.0"