Migrate from lmdb-zero to heed (#3825)
* db: migrate from lmdb-zero to heed * fix: check resizing operation and wait to avoid crash with multiple batches, fix exists check at batch * build: fix missing deps at Cargo.lock * lmdb: single environment, migrate existing databases with provided non-default environment name * fix: revert chunk size to 128mb * lmdb: ability to use multiple shared environments * build: remove unused dependency * fix: resize to have correct multiplier of the system page size * lmdb: speed up prefix iter by storing keys * lmdb: default env name * lmdb: wait db resize before read, reduce timeout before resizing * lmdb: use static reader for iterator, count existing batches for stable resize * fix: check batches count on resize waiting * lmdb: use separate databases instead of prefixes, use default db for values without prefixes, migrate old environment * fix: pop pos key * lmdb: count all open transactions to finish before resizing * lmdb: immediate resize if there are no open transactions * lmdb: remove env state when there are no more stores * lmdb: use atomic for resize and resize checking flags * lmdb: sleep 10ms when waiting all opened txs to be closed * lmdb: use atomic open txs and stores count * lmdb: use index to detect separator, ignore unknown db key to not have a panic * lmdb: store max 10k keys in the iterator * lmdb: check iter result on getting total * lmdb: handle errors at iterator * lmdb: handle an error when db with provided key not found * lmdb: fix iterate over 10k keys * lmdb: document migration resize safety * lmdb: fix iter test * lmdb: clear new db after unsuccessful migration, handle read error on migration to interrupt process * store: bring back old key methods to reproduce data migration * lmdb: return an error on unsuccessful migration * lmdb: migration test, clean data after allocate test * lmdb: info migration log * fix: move iterator before handling an error to allow skip bad value * lmdb: return an error if removal of old DB file failed after migration * lmdb: lifetime for iterator, use write transaction at batch iterator * lmdb: migration progress * fix: tests * lmdb: immediately set resizing flag, ignore resizing flag while there are more than 0 opened txs to avoid stuck, optimize tx counter for some operations * lmdb: key for successful migration * lmdb: fix put database creation at separate block to avoid lifetime issues when returning an error on migration
This commit is contained in:
+38
-25
@@ -275,22 +275,33 @@ impl Peers {
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterator over all peers we know about (stored in our db).
|
||||
pub fn peer_data_iter(&self) -> Result<impl Iterator<Item = PeerData>, Error> {
|
||||
self.store.peers_iter().map_err(From::from)
|
||||
}
|
||||
|
||||
/// Convenience for reading all peer data from the db.
|
||||
pub fn all_peer_data(&self) -> Vec<PeerData> {
|
||||
self.peer_data_iter()
|
||||
.map(|peers| peers.collect())
|
||||
.unwrap_or(vec![])
|
||||
match self.store.iter_batch() {
|
||||
Ok(batch) => match batch.peers_iter() {
|
||||
Ok(iter) => iter
|
||||
.filter(|p| p.is_ok())
|
||||
.map(|p| p.ok().unwrap())
|
||||
.collect(),
|
||||
Err(e) => {
|
||||
error!("failed to get all peer data: {:?}", e);
|
||||
vec![]
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
error!("failed to get all peer data: {:?}", e);
|
||||
vec![]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Find peers in store (not necessarily connected) and return their data
|
||||
pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec<PeerData> {
|
||||
match self.store.find_peers(state, cap, count) {
|
||||
Ok(peers) => peers,
|
||||
match self.store.iter_batch() {
|
||||
Ok(batch) => batch.find_peers(state, cap, count).unwrap_or_else(|e| {
|
||||
error!("failed to find peers: {:?}", e);
|
||||
vec![]
|
||||
}),
|
||||
Err(e) => {
|
||||
error!("failed to find peers: {:?}", e);
|
||||
vec![]
|
||||
@@ -445,24 +456,26 @@ impl Peers {
|
||||
let now = Utc::now();
|
||||
|
||||
// Delete defunct peers from storage
|
||||
let _ = self.store.delete_peers(|peer| {
|
||||
let diff = now - Utc.timestamp_opt(peer.last_connected, 0).unwrap();
|
||||
if let Ok(batch) = self.store.iter_batch() {
|
||||
let _ = batch.delete_peers(|peer| {
|
||||
let diff = now - Utc.timestamp_opt(peer.last_connected, 0).unwrap();
|
||||
|
||||
let should_remove = peer.flags == State::Defunct
|
||||
&& diff > Duration::seconds(global::PEER_EXPIRATION_REMOVE_TIME);
|
||||
let should_remove = peer.flags == State::Defunct
|
||||
&& diff > Duration::seconds(global::PEER_EXPIRATION_REMOVE_TIME);
|
||||
|
||||
if should_remove {
|
||||
debug!(
|
||||
"removing peer {:?}: last connected {} days {} hours {} minutes ago.",
|
||||
peer.addr,
|
||||
diff.num_days(),
|
||||
diff.num_hours(),
|
||||
diff.num_minutes()
|
||||
);
|
||||
}
|
||||
if should_remove {
|
||||
debug!(
|
||||
"removing peer {:?}: last connected {} days {} hours {} minutes ago.",
|
||||
peer.addr,
|
||||
diff.num_days(),
|
||||
diff.num_hours(),
|
||||
diff.num_minutes()
|
||||
);
|
||||
}
|
||||
|
||||
should_remove
|
||||
});
|
||||
should_remove
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+80
-61
@@ -20,7 +20,7 @@ use rand::prelude::*;
|
||||
|
||||
use crate::core::ser::{self, DeserializationMode, Readable, Reader, Writeable, Writer};
|
||||
use crate::types::{Capabilities, PeerAddr, ReasonForBan};
|
||||
use grin_store::{self, option_to_not_found, to_key, Error};
|
||||
use grin_store::{self, option_to_not_found, Error};
|
||||
|
||||
const DB_NAME: &str = "peer";
|
||||
const STORE_SUBPATH: &str = "peers";
|
||||
@@ -118,45 +118,94 @@ pub struct PeerStore {
|
||||
impl PeerStore {
|
||||
/// Instantiates a new peer store under the provided root path.
|
||||
pub fn new(db_root: &str) -> Result<PeerStore, Error> {
|
||||
let db = grin_store::Store::new(db_root, Some(DB_NAME), Some(STORE_SUBPATH), None)?;
|
||||
Ok(PeerStore { db: db })
|
||||
let db = grin_store::Store::new(
|
||||
db_root,
|
||||
Some(DB_NAME),
|
||||
Some(STORE_SUBPATH),
|
||||
vec![PEER_PREFIX],
|
||||
None,
|
||||
None,
|
||||
)?;
|
||||
Ok(PeerStore { db })
|
||||
}
|
||||
|
||||
pub fn save_peer(&self, p: &PeerData) -> Result<(), Error> {
|
||||
debug!("save_peer: {:?} marked {:?}", p.addr, p.flags);
|
||||
|
||||
let batch = self.db.batch()?;
|
||||
batch.put_ser(&peer_key(p.addr)[..], p)?;
|
||||
let mut batch = self.db.batch()?;
|
||||
let key = p.addr.as_key();
|
||||
batch.put_ser(Some(PEER_PREFIX), key.as_bytes(), p)?;
|
||||
batch.commit()
|
||||
}
|
||||
|
||||
pub fn save_peers(&self, p: Vec<PeerData>) -> Result<(), Error> {
|
||||
let batch = self.db.batch()?;
|
||||
let mut batch = self.db.batch()?;
|
||||
for pd in p {
|
||||
debug!("save_peers: {:?} marked {:?}", pd.addr, pd.flags);
|
||||
batch.put_ser(&peer_key(pd.addr)[..], &pd)?;
|
||||
let key = pd.addr.as_key();
|
||||
batch.put_ser(Some(PEER_PREFIX), key.as_bytes(), &pd)?;
|
||||
}
|
||||
batch.commit()
|
||||
}
|
||||
|
||||
pub fn get_peer(&self, peer_addr: PeerAddr) -> Result<PeerData, Error> {
|
||||
option_to_not_found(self.db.get_ser(&peer_key(peer_addr)[..], None), || {
|
||||
format!("Peer at address: {}", peer_addr)
|
||||
})
|
||||
let key = peer_addr.as_key();
|
||||
option_to_not_found(
|
||||
self.db.get_ser(Some(PEER_PREFIX), key.as_bytes(), None),
|
||||
|| format!("Peer at address: {}", peer_addr),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn exists_peer(&self, peer_addr: PeerAddr) -> Result<bool, Error> {
|
||||
self.db.exists(&peer_key(peer_addr)[..])
|
||||
let key = peer_addr.as_key();
|
||||
self.db.exists(Some(PEER_PREFIX), key.as_bytes())
|
||||
}
|
||||
|
||||
/// TODO - allow below added to avoid github issue reports
|
||||
#[allow(dead_code)]
|
||||
pub fn delete_peer(&self, peer_addr: PeerAddr) -> Result<(), Error> {
|
||||
let batch = self.db.batch()?;
|
||||
batch.delete(&peer_key(peer_addr)[..])?;
|
||||
/// Convenience method to load a peer data, update its status and save it
|
||||
/// back. If new state is Banned its last banned time will be updated too.
|
||||
/// If new state is Defunct last connection attempt will be updated too.
|
||||
pub fn update_state(&self, peer_addr: PeerAddr, new_state: State) -> Result<(), Error> {
|
||||
let mut batch = self.db.batch()?;
|
||||
let key = peer_addr.as_key();
|
||||
let mut peer = option_to_not_found(
|
||||
batch.get_ser::<PeerData>(Some(PEER_PREFIX), key.as_bytes(), None),
|
||||
|| format!("Peer at address: {}", peer_addr),
|
||||
)?;
|
||||
peer.flags = new_state;
|
||||
if new_state == State::Banned {
|
||||
peer.last_banned = Utc::now().timestamp();
|
||||
} else {
|
||||
peer.last_attempt = Utc::now().timestamp();
|
||||
}
|
||||
|
||||
batch.put_ser(Some(PEER_PREFIX), key.as_bytes(), &peer)?;
|
||||
batch.commit()
|
||||
}
|
||||
|
||||
/// Builds a new iterator batch to be used with this store.
|
||||
pub fn iter_batch(&self) -> Result<PeersIterBatch<'_>, Error> {
|
||||
Ok(PeersIterBatch {
|
||||
db: self.db.batch()?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PeersIterBatch<'a> {
|
||||
db: grin_store::Batch<'a>,
|
||||
}
|
||||
|
||||
impl<'a> PeersIterBatch<'a> {
|
||||
/// Iterator over all known peers.
|
||||
pub fn peers_iter(
|
||||
&'a self,
|
||||
) -> Result<impl Iterator<Item = Result<PeerData, Error>> + 'a, Error> {
|
||||
let protocol_version = self.db.protocol_version();
|
||||
self.db.iter(Some(PEER_PREFIX), move |_, mut v| {
|
||||
ser::deserialize(&mut v, protocol_version, DeserializationMode::default())
|
||||
.map_err(From::from)
|
||||
})
|
||||
}
|
||||
|
||||
/// Find some peers in our local db.
|
||||
pub fn find_peers(
|
||||
&self,
|
||||
@@ -166,78 +215,48 @@ impl PeerStore {
|
||||
) -> Result<Vec<PeerData>, Error> {
|
||||
let peers = self
|
||||
.peers_iter()?
|
||||
.filter(|p| p.is_ok())
|
||||
.map(|p| p.ok().unwrap())
|
||||
.filter(|p| p.flags == state && p.capabilities.contains(cap))
|
||||
.choose_multiple(&mut thread_rng(), count);
|
||||
Ok(peers)
|
||||
}
|
||||
|
||||
/// Iterator over all known peers.
|
||||
pub fn peers_iter(&self) -> Result<impl Iterator<Item = PeerData>, Error> {
|
||||
let key = to_key(PEER_PREFIX, "");
|
||||
let protocol_version = self.db.protocol_version();
|
||||
self.db.iter(&key, move |_, mut v| {
|
||||
ser::deserialize(&mut v, protocol_version, DeserializationMode::default())
|
||||
.map_err(From::from)
|
||||
})
|
||||
}
|
||||
|
||||
/// List all known peers
|
||||
/// Used for /v1/peers/all api endpoint
|
||||
pub fn all_peers(&self) -> Result<Vec<PeerData>, Error> {
|
||||
let peers: Vec<PeerData> = self.peers_iter()?.collect();
|
||||
let peers: Vec<PeerData> = self
|
||||
.peers_iter()?
|
||||
.filter(|p| p.is_ok())
|
||||
.map(|p| p.ok().unwrap())
|
||||
.collect();
|
||||
Ok(peers)
|
||||
}
|
||||
|
||||
/// Convenience method to load a peer data, update its status and save it
|
||||
/// back. If new state is Banned its last banned time will be updated too.
|
||||
/// If new state is Defunct last connection attempt will be updated too.
|
||||
pub fn update_state(&self, peer_addr: PeerAddr, new_state: State) -> Result<(), Error> {
|
||||
let batch = self.db.batch()?;
|
||||
|
||||
let mut peer = option_to_not_found(
|
||||
batch.get_ser::<PeerData>(&peer_key(peer_addr)[..], None),
|
||||
|| format!("Peer at address: {}", peer_addr),
|
||||
)?;
|
||||
peer.flags = new_state;
|
||||
if new_state == State::Banned {
|
||||
peer.last_banned = Utc::now().timestamp();
|
||||
} else {
|
||||
peer.last_attempt = Utc::now().timestamp();
|
||||
}
|
||||
|
||||
batch.put_ser(&peer_key(peer_addr)[..], &peer)?;
|
||||
batch.commit()
|
||||
}
|
||||
|
||||
/// Deletes peers from the storage that satisfy some condition `predicate`
|
||||
pub fn delete_peers<F>(&self, predicate: F) -> Result<(), Error>
|
||||
pub fn delete_peers<F>(mut self, predicate: F) -> Result<(), Error>
|
||||
where
|
||||
F: Fn(&PeerData) -> bool,
|
||||
{
|
||||
let mut to_remove = vec![];
|
||||
|
||||
for x in self.peers_iter()? {
|
||||
if predicate(&x) {
|
||||
to_remove.push(x)
|
||||
if let Ok(x) = x {
|
||||
if predicate(&x) {
|
||||
to_remove.push(x)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Delete peers in single batch
|
||||
if !to_remove.is_empty() {
|
||||
let batch = self.db.batch()?;
|
||||
|
||||
for peer in to_remove {
|
||||
batch.delete(&peer_key(peer.addr)[..])?;
|
||||
let key = peer.addr.as_key();
|
||||
self.db.delete(Some(PEER_PREFIX), key.as_bytes())?;
|
||||
}
|
||||
|
||||
batch.commit()?;
|
||||
self.db.commit()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// Ignore the port unless ip is loopback address.
|
||||
fn peer_key(peer_addr: PeerAddr) -> Vec<u8> {
|
||||
to_key(PEER_PREFIX, &peer_addr.as_key())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user