lmdb: migration progress

This commit is contained in:
ardocrat
2026-06-01 17:29:53 +03:00
parent 00685a45c3
commit ebcf7feb28
8 changed files with 66 additions and 23 deletions
+3 -2
View File
@@ -43,7 +43,7 @@ use std::collections::HashMap;
use std::fs::{self, File}; use std::fs::{self, File};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::{mpsc, Arc};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
/// Orphan pool size is limited by MAX_ORPHAN_SIZE /// Orphan pool size is limited by MAX_ORPHAN_SIZE
@@ -171,8 +171,9 @@ impl Chain {
genesis: Block, genesis: Block,
pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>, pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>,
archive_mode: bool, archive_mode: bool,
db_migration_prog_tx: Option<mpsc::Sender<i8>>,
) -> Result<Chain, Error> { ) -> Result<Chain, Error> {
let store = Arc::new(store::ChainStore::new(&db_root)?); let store = Arc::new(store::ChainStore::new(&db_root, db_migration_prog_tx)?);
// open the txhashset, creating a new one if necessary // open the txhashset, creating a new one if necessary
let mut txhashset = txhashset::TxHashSet::open(db_root.clone(), store.clone(), None)?; let mut txhashset = txhashset::TxHashSet::open(db_root.clone(), store.clone(), None)?;
+6 -2
View File
@@ -28,7 +28,7 @@ use grin_core::ser;
use grin_store as store; use grin_store as store;
use grin_store::{option_to_not_found, Error}; use grin_store::{option_to_not_found, Error};
use std::convert::TryInto; use std::convert::TryInto;
use std::sync::Arc; use std::sync::{mpsc, Arc};
const STORE_SUBPATH: &str = "chain"; const STORE_SUBPATH: &str = "chain";
@@ -67,13 +67,17 @@ pub struct ChainStore {
impl ChainStore { impl ChainStore {
/// Create new chain store /// Create new chain store
pub fn new(db_root: &str) -> Result<ChainStore, Error> { pub fn new(
db_root: &str,
db_migration_prog_tx: Option<mpsc::Sender<i8>>,
) -> Result<ChainStore, Error> {
let db = store::Store::new( let db = store::Store::new(
db_root, db_root,
None, None,
Some(STORE_SUBPATH), Some(STORE_SUBPATH),
DB_PREFIXES.to_vec(), DB_PREFIXES.to_vec(),
None, None,
db_migration_prog_tx,
)?; )?;
Ok(ChainStore { db }) Ok(ChainStore { db })
} }
+1
View File
@@ -124,6 +124,7 @@ impl PeerStore {
Some(STORE_SUBPATH), Some(STORE_SUBPATH),
vec![PEER_PREFIX], vec![PEER_PREFIX],
None, None,
None,
)?; )?;
Ok(PeerStore { db }) Ok(PeerStore { db })
} }
+2
View File
@@ -410,6 +410,8 @@ impl DandelionEpoch {
pub enum ServerInitStatus { pub enum ServerInitStatus {
/// Database loading. /// Database loading.
LoadDatabase, LoadDatabase,
/// Database migration progress.
DBMigrationProgress(i8),
/// P2P server initialization. /// P2P server initialization.
StartSync, StartSync,
/// API server initialization. /// API server initialization.
+18 -2
View File
@@ -16,6 +16,7 @@
//! the peer-to-peer server, the blockchain and the transaction pool) and acts //! the peer-to-peer server, the blockchain and the transaction pool) and acts
//! as a facade. //! as a facade.
use fs2::FileExt;
use std::fs::File; use std::fs::File;
use std::io::prelude::*; use std::io::prelude::*;
use std::path::Path; use std::path::Path;
@@ -25,8 +26,6 @@ use std::{
thread::{self, JoinHandle}, thread::{self, JoinHandle},
time::{self, Duration}, time::{self, Duration},
}; };
use fs2::FileExt;
use walkdir::WalkDir; use walkdir::WalkDir;
use crate::api; use crate::api;
@@ -192,12 +191,29 @@ impl Server {
let _ = server_tx.send(ServerInitStatus::LoadDatabase); let _ = server_tx.send(ServerInitStatus::LoadDatabase);
} }
let (db_migration_prog_tx, db_migration_prog_rx) = mpsc::channel::<i8>();
if let Some(ref server_tx) = server_tx {
let server_tx = server_tx.clone();
thread::spawn(move || loop {
match db_migration_prog_rx.recv() {
Ok(p) => {
if p == 100 {
break;
}
let _ = server_tx.send(ServerInitStatus::DBMigrationProgress(p));
}
Err(_) => break,
}
});
}
let shared_chain = Arc::new(chain::Chain::init( let shared_chain = Arc::new(chain::Chain::init(
config.db_root.clone(), config.db_root.clone(),
chain_adapter.clone(), chain_adapter.clone(),
genesis.clone(), genesis.clone(),
pow::verify_size, pow::verify_size,
archive_mode, archive_mode,
Some(db_migration_prog_tx),
)?); )?);
pool_adapter.set_chain(shared_chain.clone()); pool_adapter.set_chain(shared_chain.clone());
+4 -11
View File
@@ -34,21 +34,13 @@ use grin_util::logger::LogEntry;
use grin_util::StopState; use grin_util::StopState;
use std::sync::mpsc; use std::sync::mpsc;
/// wrap below to allow UI to clean up on stop /// Start node server at TUI or non-TUI mode.
pub fn start_server( pub fn start_server(
config: servers::ServerConfig, config: servers::ServerConfig,
logs_rx: Option<mpsc::Receiver<LogEntry>>, logs_rx: Option<mpsc::Receiver<LogEntry>>,
api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>), api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>),
) { ) {
exit(start_server_tui(config, logs_rx, api_chan)); let exit_code = if config.run_tui.unwrap_or(false) {
}
fn start_server_tui(
config: servers::ServerConfig,
logs_rx: Option<mpsc::Receiver<LogEntry>>,
api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>),
) -> i32 {
if config.run_tui.unwrap_or(false) {
warn!("Starting GRIN in UI mode..."); warn!("Starting GRIN in UI mode...");
// Run the UI controller. // Run the UI controller.
let (serv_tx, serv_rx) = mpsc::channel::<ServerInitStatus>(); let (serv_tx, serv_rx) = mpsc::channel::<ServerInitStatus>();
@@ -102,7 +94,8 @@ fn start_server_tui(
1 1
} }
} }
} };
exit(exit_code);
} }
/// Handles the server part of the command line, mostly running, starting and /// Handles the server part of the command line, mostly running, starting and
+7 -3
View File
@@ -247,16 +247,16 @@ impl Controller {
let mut exit_code = 0; let mut exit_code = 0;
while self.ui.step() { while self.ui.step() {
if let Some(message) = self.rx.try_iter().next() { if let Some(message) = self.rx.try_iter().next() {
match message { return match message {
ControllerMessage::Shutdown => { ControllerMessage::Shutdown => {
warn!("Shutdown in progress, please wait"); warn!("Shutdown in progress, please wait");
self.ui.stop(); self.ui.stop();
if let Some(s) = self.server.take() { if let Some(s) = self.server.take() {
s.stop(); s.stop();
} }
return exit_code; exit_code
} }
} };
} }
if let Some(m) = self.serv_rx.try_iter().next() { if let Some(m) = self.serv_rx.try_iter().next() {
@@ -273,6 +273,10 @@ impl Controller {
exit_code = 1; exit_code = 1;
self.init_error(e); self.init_error(e);
} }
ServerInitStatus::DBMigrationProgress(p) => {
let status = format!("Migrating database: {}%, please wait...", p);
self.init_status(status.as_str(), true);
}
} }
} }
+25 -3
View File
@@ -19,7 +19,7 @@ use heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, WithoutTls};
use std::collections::HashMap; use std::collections::HashMap;
use std::path::Path; use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::{Arc, OnceLock}; use std::sync::{mpsc, Arc, OnceLock};
use std::time::Duration; use std::time::Duration;
use std::{fs, thread}; use std::{fs, thread};
@@ -159,6 +159,7 @@ impl Store {
db_name: Option<&str>, db_name: Option<&str>,
prefixes: Vec<u8>, prefixes: Vec<u8>,
max_readers: Option<u32>, max_readers: Option<u32>,
db_migration_prog_tx: Option<mpsc::Sender<i8>>,
) -> Result<Store, Error> { ) -> Result<Store, Error> {
let full_path = Path::new(root_path) let full_path = Path::new(root_path)
.join(DEFAULT_MULTI_DB_ENV_NAME) .join(DEFAULT_MULTI_DB_ENV_NAME)
@@ -251,7 +252,8 @@ impl Store {
if env_name != DEFAULT_MULTI_DB_ENV_NAME { if env_name != DEFAULT_MULTI_DB_ENV_NAME {
let migrate_from = Path::new(root_path).join(env_name); let migrate_from = Path::new(root_path).join(env_name);
if migrate_from.exists() { if migrate_from.exists() {
match s.migrate_to_default_env(db_name, &migrate_from) { let _ = s.clear();
match s.migrate_to_default_env(db_name, &migrate_from, db_migration_prog_tx) {
Ok(_) => match fs::remove_dir_all(&migrate_from) { Ok(_) => match fs::remove_dir_all(&migrate_from) {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
@@ -283,8 +285,14 @@ impl Store {
&self, &self,
from_name: Option<&str>, from_name: Option<&str>,
from_path: &Path, from_path: &Path,
db_migration_prog_tx: Option<mpsc::Sender<i8>>,
) -> Result<(), Error> { ) -> Result<(), Error> {
info!("Migrating DB {:?}, please wait...", from_path); info!("Migrating DB {:?}, please wait...", from_path);
if let Some(migration_prog_tx) = &db_migration_prog_tx {
let _ = migration_prog_tx.send(0i8);
}
let from_env = unsafe { let from_env = unsafe {
let mut options = EnvOpenOptions::new().read_txn_without_tls(); let mut options = EnvOpenOptions::new().read_txn_without_tls();
let env_options = options.map_size(self.alloc_chunk_size).max_dbs(24); let env_options = options.map_size(self.alloc_chunk_size).max_dbs(24);
@@ -307,7 +315,16 @@ impl Store {
let mut write_to = self.env.write_txn()?; let mut write_to = self.env.write_txn()?;
let read_from = from_env.read_txn()?; let read_from = from_env.read_txn()?;
let mut count = 0; let mut count = 0;
for kv in db_from.iter(&read_from)? { let total = db_from.iter(&read_from)?.count();
let mut prev_prog = 0;
for (index, kv) in db_from.iter(&read_from)?.enumerate() {
if let Some(migration_prog_tx) = &db_migration_prog_tx {
let prog = 100 * index / total;
if prev_prog != prog && prog != 100 {
prev_prog = prog;
let _ = migration_prog_tx.send(prog as i8);
}
}
let (k, v) = kv?; let (k, v) = kv?;
if k.len() > 1 && k[1] == PREFIX_KEY_SEPARATOR { if k.len() > 1 && k[1] == PREFIX_KEY_SEPARATOR {
let db_name = k.split_at(1).0; let db_name = k.split_at(1).0;
@@ -324,6 +341,11 @@ impl Store {
} }
} }
write_to.commit()?; write_to.commit()?;
if let Some(migration_prog_tx) = &db_migration_prog_tx {
let _ = migration_prog_tx.send(100i8);
}
info!("Migrated {} records from {:?}", count, from_path); info!("Migrated {} records from {:?}", count, from_path);
Ok(()) Ok(())
} }