From ebcf7feb28dcd7db007c38c91ef9b2d3b40b0076 Mon Sep 17 00:00:00 2001 From: ardocrat Date: Mon, 1 Jun 2026 17:29:53 +0300 Subject: [PATCH] lmdb: migration progress --- chain/src/chain.rs | 5 +++-- chain/src/store.rs | 8 ++++++-- p2p/src/store.rs | 1 + servers/src/common/types.rs | 2 ++ servers/src/grin/server.rs | 20 ++++++++++++++++++-- src/bin/cmd/server.rs | 15 ++++----------- src/bin/tui/ui.rs | 10 +++++++--- store/src/lmdb.rs | 28 +++++++++++++++++++++++++--- 8 files changed, 66 insertions(+), 23 deletions(-) diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 11057398..7a55d226 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -43,7 +43,7 @@ use std::collections::HashMap; use std::fs::{self, File}; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; +use std::sync::{mpsc, Arc}; use std::time::{Duration, Instant}; /// Orphan pool size is limited by MAX_ORPHAN_SIZE @@ -171,8 +171,9 @@ impl Chain { genesis: Block, pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>, archive_mode: bool, + db_migration_prog_tx: Option>, ) -> Result { - let store = Arc::new(store::ChainStore::new(&db_root)?); + let store = Arc::new(store::ChainStore::new(&db_root, db_migration_prog_tx)?); // open the txhashset, creating a new one if necessary let mut txhashset = txhashset::TxHashSet::open(db_root.clone(), store.clone(), None)?; diff --git a/chain/src/store.rs b/chain/src/store.rs index d8df65f2..dff11dde 100644 --- a/chain/src/store.rs +++ b/chain/src/store.rs @@ -28,7 +28,7 @@ use grin_core::ser; use grin_store as store; use grin_store::{option_to_not_found, Error}; use std::convert::TryInto; -use std::sync::Arc; +use std::sync::{mpsc, Arc}; const STORE_SUBPATH: &str = "chain"; @@ -67,13 +67,17 @@ pub struct ChainStore { impl ChainStore { /// Create new chain store - pub fn new(db_root: &str) -> Result { + pub fn new( + db_root: &str, + db_migration_prog_tx: Option>, + ) -> Result { let db = store::Store::new( db_root, None, Some(STORE_SUBPATH), DB_PREFIXES.to_vec(), None, + db_migration_prog_tx, )?; Ok(ChainStore { db }) } diff --git a/p2p/src/store.rs b/p2p/src/store.rs index 39018ad5..675d31b8 100644 --- a/p2p/src/store.rs +++ b/p2p/src/store.rs @@ -124,6 +124,7 @@ impl PeerStore { Some(STORE_SUBPATH), vec![PEER_PREFIX], None, + None, )?; Ok(PeerStore { db }) } diff --git a/servers/src/common/types.rs b/servers/src/common/types.rs index a428a5eb..508f2610 100644 --- a/servers/src/common/types.rs +++ b/servers/src/common/types.rs @@ -410,6 +410,8 @@ impl DandelionEpoch { pub enum ServerInitStatus { /// Database loading. LoadDatabase, + /// Database migration progress. + DBMigrationProgress(i8), /// P2P server initialization. StartSync, /// API server initialization. diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index f492ed17..d95ca66d 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -16,6 +16,7 @@ //! the peer-to-peer server, the blockchain and the transaction pool) and acts //! as a facade. +use fs2::FileExt; use std::fs::File; use std::io::prelude::*; use std::path::Path; @@ -25,8 +26,6 @@ use std::{ thread::{self, JoinHandle}, time::{self, Duration}, }; - -use fs2::FileExt; use walkdir::WalkDir; use crate::api; @@ -192,12 +191,29 @@ impl Server { let _ = server_tx.send(ServerInitStatus::LoadDatabase); } + let (db_migration_prog_tx, db_migration_prog_rx) = mpsc::channel::(); + if let Some(ref server_tx) = server_tx { + let server_tx = server_tx.clone(); + thread::spawn(move || loop { + match db_migration_prog_rx.recv() { + Ok(p) => { + if p == 100 { + break; + } + let _ = server_tx.send(ServerInitStatus::DBMigrationProgress(p)); + } + Err(_) => break, + } + }); + } + let shared_chain = Arc::new(chain::Chain::init( config.db_root.clone(), chain_adapter.clone(), genesis.clone(), pow::verify_size, archive_mode, + Some(db_migration_prog_tx), )?); pool_adapter.set_chain(shared_chain.clone()); diff --git a/src/bin/cmd/server.rs b/src/bin/cmd/server.rs index fbca7d47..a511d30c 100644 --- a/src/bin/cmd/server.rs +++ b/src/bin/cmd/server.rs @@ -34,21 +34,13 @@ use grin_util::logger::LogEntry; use grin_util::StopState; use std::sync::mpsc; -/// wrap below to allow UI to clean up on stop +/// Start node server at TUI or non-TUI mode. pub fn start_server( config: servers::ServerConfig, logs_rx: Option>, api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>), ) { - exit(start_server_tui(config, logs_rx, api_chan)); -} - -fn start_server_tui( - config: servers::ServerConfig, - logs_rx: Option>, - api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>), -) -> i32 { - if config.run_tui.unwrap_or(false) { + let exit_code = if config.run_tui.unwrap_or(false) { warn!("Starting GRIN in UI mode..."); // Run the UI controller. let (serv_tx, serv_rx) = mpsc::channel::(); @@ -102,7 +94,8 @@ fn start_server_tui( 1 } } - } + }; + exit(exit_code); } /// Handles the server part of the command line, mostly running, starting and diff --git a/src/bin/tui/ui.rs b/src/bin/tui/ui.rs index e28876dc..5bdb9fec 100644 --- a/src/bin/tui/ui.rs +++ b/src/bin/tui/ui.rs @@ -247,16 +247,16 @@ impl Controller { let mut exit_code = 0; while self.ui.step() { if let Some(message) = self.rx.try_iter().next() { - match message { + return match message { ControllerMessage::Shutdown => { warn!("Shutdown in progress, please wait"); self.ui.stop(); if let Some(s) = self.server.take() { s.stop(); } - return exit_code; + exit_code } - } + }; } if let Some(m) = self.serv_rx.try_iter().next() { @@ -273,6 +273,10 @@ impl Controller { exit_code = 1; self.init_error(e); } + ServerInitStatus::DBMigrationProgress(p) => { + let status = format!("Migrating database: {}%, please wait...", p); + self.init_status(status.as_str(), true); + } } } diff --git a/store/src/lmdb.rs b/store/src/lmdb.rs index 9e8fd4cc..b12a2191 100644 --- a/store/src/lmdb.rs +++ b/store/src/lmdb.rs @@ -19,7 +19,7 @@ use heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, WithoutTls}; use std::collections::HashMap; use std::path::Path; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; -use std::sync::{Arc, OnceLock}; +use std::sync::{mpsc, Arc, OnceLock}; use std::time::Duration; use std::{fs, thread}; @@ -159,6 +159,7 @@ impl Store { db_name: Option<&str>, prefixes: Vec, max_readers: Option, + db_migration_prog_tx: Option>, ) -> Result { let full_path = Path::new(root_path) .join(DEFAULT_MULTI_DB_ENV_NAME) @@ -251,7 +252,8 @@ impl Store { if env_name != DEFAULT_MULTI_DB_ENV_NAME { let migrate_from = Path::new(root_path).join(env_name); if migrate_from.exists() { - match s.migrate_to_default_env(db_name, &migrate_from) { + let _ = s.clear(); + match s.migrate_to_default_env(db_name, &migrate_from, db_migration_prog_tx) { Ok(_) => match fs::remove_dir_all(&migrate_from) { Ok(_) => {} Err(e) => { @@ -283,8 +285,14 @@ impl Store { &self, from_name: Option<&str>, from_path: &Path, + db_migration_prog_tx: Option>, ) -> Result<(), Error> { info!("Migrating DB {:?}, please wait...", from_path); + + if let Some(migration_prog_tx) = &db_migration_prog_tx { + let _ = migration_prog_tx.send(0i8); + } + let from_env = unsafe { let mut options = EnvOpenOptions::new().read_txn_without_tls(); let env_options = options.map_size(self.alloc_chunk_size).max_dbs(24); @@ -307,7 +315,16 @@ impl Store { let mut write_to = self.env.write_txn()?; let read_from = from_env.read_txn()?; let mut count = 0; - for kv in db_from.iter(&read_from)? { + let total = db_from.iter(&read_from)?.count(); + let mut prev_prog = 0; + for (index, kv) in db_from.iter(&read_from)?.enumerate() { + if let Some(migration_prog_tx) = &db_migration_prog_tx { + let prog = 100 * index / total; + if prev_prog != prog && prog != 100 { + prev_prog = prog; + let _ = migration_prog_tx.send(prog as i8); + } + } let (k, v) = kv?; if k.len() > 1 && k[1] == PREFIX_KEY_SEPARATOR { let db_name = k.split_at(1).0; @@ -324,6 +341,11 @@ impl Store { } } write_to.commit()?; + + if let Some(migration_prog_tx) = &db_migration_prog_tx { + let _ = migration_prog_tx.send(100i8); + } + info!("Migrated {} records from {:?}", count, from_path); Ok(()) }