Save last scanned block info for wallet scanning (#748)

* scan: save last scanned block info for initial wallet scanning

* scan: save start height at scanned block info on outputs collecting

* scan: 10k blocks batches

* fix: include last height into batch on scan

* scan: save hash of last block

* scan: add larger window (2880 blocks) to scan outputs when updating wallet state

* add test batched scan and resume, add scan doc

* rename corrupted restore test

---------

Co-authored-by: Joerg <wiesche89@googlemail.com>
This commit is contained in:
ardocrat
2026-06-02 21:09:50 +03:00
committed by GitHub
parent 602d79e868
commit 129ad2f5fb
3 changed files with 352 additions and 69 deletions
+215
View File
@@ -26,6 +26,7 @@ use grin_wallet_libwallet as libwallet;
use impls::test_framework::{self, LocalWalletClient};
use impls::{PathToSlate, SlatePutter as _};
use libwallet::{InitTxArgs, NodeClient};
use std::cmp;
use std::sync::atomic::Ordering;
use std::thread;
use std::time::Duration;
@@ -847,6 +848,200 @@ fn output_scanning_impl(test_dir: &'static str) -> Result<(), libwallet::Error>
Ok(())
}
fn multi_batch_scan_impl(test_dir: &'static str) -> Result<(), libwallet::Error> {
let mut wallet_proxy = create_wallet_proxy(test_dir);
let chain = wallet_proxy.chain.clone();
let stopper = wallet_proxy.running.clone();
create_wallet_and_add!(
client1,
wallet1,
mask1_i,
test_dir,
"wallet1",
None,
&mut wallet_proxy,
false
);
let mask1 = (&mask1_i).as_ref();
thread::spawn(move || {
if let Err(e) = wallet_proxy.run() {
error!("Wallet Proxy error: {}", e);
}
});
let reward = consensus::REWARD;
let bh = 12u64;
let _ =
test_framework::award_blocks_to_wallet(&chain, wallet1.clone(), mask1, bh as usize, false);
let mut output_commits = vec![];
wallet::controller::owner_single_use(Some(wallet1.clone()), mask1, None, |api, m| {
let (_, info) = api.retrieve_summary_info(m, true, 1)?;
assert_eq!(info.last_confirmed_height, bh);
assert_eq!(info.total, bh * reward);
output_commits = api.retrieve_outputs(m, false, true, None)?.1;
Ok(())
})?;
{
wallet_inst!(wallet1, w);
let mut batch = w.batch(mask1)?;
batch.delete(&output_commits[8].output.key_id, &None)?;
batch.commit()?;
}
let tip = {
wallet_inst!(wallet1, w);
w.w2n_client().get_chain_tip()?
};
let start_height = 1;
let batch_size = 5;
let mut total_pmmr_range = None;
let mut batches = 0;
let status_send_channel = None;
for h in (start_height..tip.0).step_by((batch_size + 1) as usize) {
let batch_end_height = cmp::min(tip.0, h + batch_size);
let (mut info, range) = libwallet::scan(
wallet1.clone(),
mask1,
false,
h,
batch_end_height,
start_height,
tip.0,
total_pmmr_range,
&status_send_channel,
)?;
info.hash = if batch_end_height == tip.0 {
tip.1.clone()
} else {
"".to_owned()
};
total_pmmr_range = Some(range);
batches += 1;
wallet_inst!(wallet1, w);
let mut batch = w.batch(mask1)?;
batch.save_last_scanned_block(info)?;
batch.commit()?;
}
assert!(batches > 1);
wallet::controller::owner_single_use(Some(wallet1.clone()), mask1, None, |api, m| {
let (_, restored_outputs) = api.retrieve_outputs(m, false, true, None)?;
let (_, info) = api.retrieve_summary_info(m, false, 1)?;
assert_eq!(info.total, bh * reward);
assert_eq!(restored_outputs.len(), bh as usize);
Ok(())
})?;
stopper.store(false, Ordering::Relaxed);
thread::sleep(Duration::from_millis(200));
Ok(())
}
fn restore_corrupted_outputs_across_batches_impl(
test_dir: &'static str,
) -> Result<(), libwallet::Error> {
let mut wallet_proxy = create_wallet_proxy(test_dir);
let chain = wallet_proxy.chain.clone();
let stopper = wallet_proxy.running.clone();
create_wallet_and_add!(
client1,
wallet1,
mask1_i,
test_dir,
"wallet1",
None,
&mut wallet_proxy,
false
);
let mask1 = (&mask1_i).as_ref();
thread::spawn(move || {
if let Err(e) = wallet_proxy.run() {
error!("Wallet Proxy error: {}", e);
}
});
let reward = consensus::REWARD;
let bh = 12u64;
let _ =
test_framework::award_blocks_to_wallet(&chain, wallet1.clone(), mask1, bh as usize, false);
let mut output_commits = vec![];
wallet::controller::owner_single_use(Some(wallet1.clone()), mask1, None, |api, m| {
let (_, info) = api.retrieve_summary_info(m, true, 1)?;
assert_eq!(info.total, bh * reward);
output_commits = api.retrieve_outputs(m, false, true, None)?.1;
Ok(())
})?;
{
wallet_inst!(wallet1, w);
let mut batch = w.batch(mask1)?;
batch.delete(&output_commits[2].output.key_id, &None)?;
batch.delete(&output_commits[8].output.key_id, &None)?;
batch.commit()?;
}
let tip = {
wallet_inst!(wallet1, w);
w.w2n_client().get_chain_tip()?
};
let status_send_channel = None;
let (info, _) = libwallet::scan(
wallet1.clone(),
mask1,
false,
1,
6,
1,
tip.0,
None,
&status_send_channel,
)?;
{
wallet_inst!(wallet1, w);
{
let mut batch = w.batch(mask1)?;
batch.save_last_scanned_block(info)?;
batch.commit()?;
}
let last_scanned = w.last_scanned_block()?;
assert_eq!(last_scanned.height, 6);
assert_eq!(last_scanned.hash, "");
}
wallet::controller::owner_single_use(Some(wallet1.clone()), mask1, None, |api, m| {
let (_, outputs_after_first_batch) = api.retrieve_outputs(m, false, false, None)?;
assert_eq!(outputs_after_first_batch.len(), (bh - 1) as usize);
let (_, info) = api.retrieve_summary_info(m, true, 1)?;
let (_, restored_outputs) = api.retrieve_outputs(m, false, true, None)?;
let unique_keys = restored_outputs
.iter()
.map(|o| o.output.key_id.clone())
.collect::<std::collections::HashSet<_>>();
assert_eq!(info.total, bh * reward);
assert_eq!(restored_outputs.len(), bh as usize);
assert_eq!(unique_keys.len(), bh as usize);
assert!(restored_outputs
.iter()
.all(|o| o.output.status == libwallet::OutputStatus::Unspent));
Ok(())
})?;
stopper.store(false, Ordering::Relaxed);
thread::sleep(Duration::from_millis(200));
Ok(())
}
#[test]
fn scan() {
let test_dir = "test_output/scan";
@@ -876,3 +1071,23 @@ fn output_scanning() {
}
clean_output_dir(test_dir);
}
#[test]
fn multi_batch_scan() {
let test_dir = "test_output/multi_batch_scan";
setup(test_dir);
if let Err(e) = multi_batch_scan_impl(test_dir) {
panic!("Libwallet Error: {}", e);
}
clean_output_dir(test_dir);
}
#[test]
fn restore_corrupted_outputs_across_batches() {
let test_dir = "test_output/restore_corrupted_outputs_across_batches";
setup(test_dir);
if let Err(e) = restore_corrupted_outputs_across_batches_impl(test_dir) {
panic!("Libwallet Error: {}", e);
}
clean_output_dir(test_dir);
}
+61 -36
View File
@@ -14,6 +14,7 @@
//! Generic implementation of owner API functions
use std::cmp;
use uuid::Uuid;
use crate::api_impl::foreign::finalize_tx as foreign_finalize;
@@ -974,25 +975,35 @@ where
w.w2n_client().get_chain_tip()?
};
let start_height = match start_height {
Some(h) => h,
None => 1,
};
let start_height = start_height.unwrap_or_else(|| 1);
let mut info = scan::scan(
wallet_inst.clone(),
keychain_mask,
delete_unconfirmed,
start_height,
tip.0,
status_send_channel,
)?;
info.hash = tip.1;
// Scan every 10k heights to save data between batches in case of interruption.
let mut total_pmmr_range = None;
for h in (start_height..tip.0).step_by(10001) {
let batch_end_height = cmp::min(tip.0, h + 10000);
let (mut info, range) = scan::scan(
wallet_inst.clone(),
keychain_mask,
delete_unconfirmed,
h,
batch_end_height,
start_height,
tip.0,
total_pmmr_range,
status_send_channel,
)?;
info.hash = if batch_end_height == tip.0 {
tip.1.clone()
} else {
"".to_owned()
};
total_pmmr_range = Some(range);
wallet_lock!(wallet_inst, w);
let mut batch = w.batch(keychain_mask)?;
batch.save_last_scanned_block(info)?;
batch.commit()?;
wallet_lock!(wallet_inst, w);
let mut batch = w.batch(keychain_mask)?;
batch.save_last_scanned_block(info)?;
batch.commit()?;
}
Ok(())
}
@@ -1032,6 +1043,9 @@ where
}
}
/// Wallet scan window in blocks (48 hours).
pub const REORG_RESCAN_WINDOW: u64 = 24 * 60 * 2;
/// Experimental, wrap the entire definition of how a wallet's state is updated
pub fn update_wallet_state<'a, L, C, K>(
wallet_inst: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
@@ -1110,23 +1124,24 @@ where
let last_scanned_block = {
wallet_lock!(wallet_inst, w);
match w.init_status()? {
WalletInitStatus::InitNeedsScanning => ScannedBlockInfo {
height: 0,
hash: "".to_owned(),
start_pmmr_index: 0,
last_pmmr_index: 0,
},
WalletInitStatus::InitNoScanning => ScannedBlockInfo {
height: tip.clone().0,
hash: tip.clone().1,
start_pmmr_index: 0,
last_pmmr_index: 0,
},
WalletInitStatus::InitComplete => w.last_scanned_block()?,
_ => w.last_scanned_block()?,
}
};
let start_index = last_scanned_block.height.saturating_sub(100);
let start_height = last_scanned_block
.height
.saturating_sub(REORG_RESCAN_WINDOW);
debug!(
"update_wallet_state: last_scanned_block: {:?}",
last_scanned_block
);
if last_scanned_block.height == 0 {
let msg = "This wallet has not been scanned against the current chain. Beginning full scan... (this first scan may take a while, but subsequent scans will be much quicker)".to_string();
@@ -1135,18 +1150,28 @@ where
}
}
let mut info = scan::scan(
wallet_inst.clone(),
keychain_mask,
false,
start_index,
tip.0,
status_send_channel,
)?;
// Scan every 10k heights to save data between batches in case of interruption.
let mut total_pmmr_range = None;
for h in (start_height..tip.0).step_by(10001) {
let batch_end_height = cmp::min(tip.0, h + 10000);
let (mut info, range) = scan::scan(
wallet_inst.clone(),
keychain_mask,
false,
h,
batch_end_height,
start_height,
tip.0,
total_pmmr_range,
status_send_channel,
)?;
info.hash = if batch_end_height == tip.0 {
tip.1.clone()
} else {
"".to_owned()
};
total_pmmr_range = Some(range);
info.hash = tip.1;
{
wallet_lock!(wallet_inst, w);
let mut batch = w.batch(keychain_mask)?;
batch.save_last_scanned_block(info)?;
+76 -33
View File
@@ -229,26 +229,30 @@ where
fn collect_chain_outputs<'a, C, K>(
keychain: &K,
client: C,
batch_start_index: u64,
batch_end_index: u64,
start_index: u64,
end_index: Option<u64>,
end_index: u64,
status_send_channel: &Option<Sender<StatusMessage>>,
) -> Result<(Vec<OutputResult>, u64), Error>
) -> Result<(Vec<OutputResult>, u64, u8), Error>
where
C: NodeClient + 'a,
K: Keychain + 'a,
{
let batch_size = 1000;
let start_index_stat = start_index;
let mut start_index = start_index;
let mut start_index = batch_start_index;
let mut result_vec: Vec<OutputResult> = vec![];
let last_retrieved_return_index;
let mut perc_complete;
loop {
let (highest_index, last_retrieved_index, outputs) =
client.get_outputs_by_pmmr_index(start_index, end_index, batch_size)?;
client.get_outputs_by_pmmr_index(start_index, Some(batch_end_index), batch_size)?;
let range = highest_index as f64 - start_index_stat as f64;
let range = end_index as f64 - start_index_stat as f64;
let progress = last_retrieved_index as f64 - start_index_stat as f64;
let perc_complete = cmp::min(((progress / range) * 100.0) as u8, 99);
perc_complete = cmp::min(((progress / range) * 100.0) as u8, 99);
let msg = format!(
"Checking {} outputs, up to index {}. (Highest index: {})",
@@ -264,16 +268,21 @@ where
keychain,
outputs.clone(),
status_send_channel,
perc_complete as u8,
perc_complete,
)?);
debug!(
"collect_chain_outputs: start_index {}, last_pmmr_index {}",
start_index, last_retrieved_index
);
if highest_index <= last_retrieved_index {
last_retrieved_return_index = last_retrieved_index;
break;
}
start_index = last_retrieved_index + 1;
}
Ok((result_vec, last_retrieved_return_index))
Ok((result_vec, last_retrieved_return_index, perc_complete))
}
///
@@ -451,17 +460,30 @@ where
Ok(chain_outs)
}
/// Check / repair wallet contents by scanning against chain
/// assume wallet contents have been freshly updated with contents
/// of latest block
/// Scan chain outputs and repair the wallet state where needed.
///
/// This is the low-level worker used by the owner API. Callers should normally
/// use the owner scan/update methods instead of calling this directly, unless
/// they need to drive a scan in batches and save progress between those batches.
///
/// `batch_start_height` and `batch_end_height` describe the part of the chain
/// scanned by this call. `start_height` and `end_height` describe the full scan
/// range, so progress and PMMR bounds still reflect the whole scan.
///
/// The returned `ScannedBlockInfo` is the progress marker to persist after the
/// batch. The returned PMMR range can be passed into the next batch to avoid
/// looking up the full range again.
pub fn scan<'a, L, C, K>(
wallet_inst: Arc<Mutex<Box<dyn WalletInst<'a, L, C, K>>>>,
keychain_mask: Option<&SecretKey>,
delete_unconfirmed: bool,
batch_start_height: u64,
batch_end_height: u64,
start_height: u64,
end_height: u64,
total_pmmr_range: Option<(u64, u64)>,
status_send_channel: &Option<Sender<StatusMessage>>,
) -> Result<ScannedBlockInfo, Error>
) -> Result<(ScannedBlockInfo, (u64, u64)), Error>
where
L: WalletLCProvider<'a, C, K>,
C: NodeClient + 'a,
@@ -469,30 +491,46 @@ where
{
// First, get a definitive list of outputs we own from the chain
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::Scanning("Starting UTXO scan".to_owned(), 0));
if start_height == batch_start_height {
let _ = s.send(StatusMessage::Scanning("Starting UTXO scan".to_owned(), 0));
}
}
let (client, keychain) = {
wallet_lock!(wallet_inst, w);
(w.w2n_client().clone(), w.keychain(keychain_mask)?)
};
// Retrieve the actual PMMR index range we're looking for
let pmmr_range = client.height_range_to_pmmr_indices(start_height, Some(end_height))?;
// Retrieve total PMMR index range we're looking for
let total_pmmr_range = if let Some(total_range) = total_pmmr_range {
total_range
} else {
client.height_range_to_pmmr_indices(start_height, Some(end_height))?
};
let (chain_outs, last_index) = collect_chain_outputs(
// Retrieve current batch PMMR index range
let pmmr_range =
client.height_range_to_pmmr_indices(batch_start_height, Some(batch_end_height))?;
debug!(
"scan: from: {}, to: {}",
batch_start_height, batch_end_height
);
let (chain_outs, last_index, perc_complete) = collect_chain_outputs(
&keychain,
client,
pmmr_range.0,
Some(pmmr_range.1),
pmmr_range.1,
total_pmmr_range.0,
total_pmmr_range.1,
status_send_channel,
)?;
let msg = format!(
"Identified {} wallet_outputs as belonging to this wallet",
chain_outs.len(),
);
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::Scanning(msg, 99));
let _ = s.send(StatusMessage::Scanning(msg, perc_complete));
}
// Now, get all outputs owned by this wallet (regardless of account)
@@ -530,7 +568,7 @@ where
o.value, o.key_id, m.1.commit,
);
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::Scanning(msg, 99));
let _ = s.send(StatusMessage::Scanning(msg, perc_complete));
}
o.status = OutputStatus::Unspent;
// any transactions associated with this should be cancelled
@@ -551,7 +589,7 @@ where
m.value, m.key_id, m.commit, m.mmr_index
);
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::Scanning(msg, 99));
let _ = s.send(StatusMessage::Scanning(msg, perc_complete));
}
restore_missing_output(
wallet_inst.clone(),
@@ -572,7 +610,7 @@ where
o.value, o.key_id, m.1.commit,
);
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::Scanning(msg, 99));
let _ = s.send(StatusMessage::Scanning(msg, perc_complete));
}
o.status = OutputStatus::Unspent;
cancel_tx_log_entry(wallet_inst.clone(), keychain_mask, &o)?;
@@ -595,7 +633,7 @@ where
o.value, o.key_id, m.commit,
);
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::Scanning(msg, 99));
let _ = s.send(StatusMessage::Scanning(msg, perc_complete));
}
cancel_tx_log_entry(wallet_inst.clone(), keychain_mask, &o)?;
wallet_lock!(wallet_inst, w);
@@ -616,7 +654,7 @@ where
let label = format!("{}_{}", label_base, acct_index);
let msg = format!("Setting account {} at path {}", label, path);
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::Scanning(msg, 99));
let _ = s.send(StatusMessage::Scanning(msg, perc_complete));
}
keys::set_acct_path(&mut **w, keychain_mask, &label, path)?;
acct_index += 1;
@@ -631,15 +669,20 @@ where
}
if let Some(ref s) = status_send_channel {
let _ = s.send(StatusMessage::ScanningComplete(
"Scanning Complete".to_owned(),
));
if end_height == batch_end_height {
let _ = s.send(StatusMessage::ScanningComplete(
"Scanning Complete".to_owned(),
));
}
}
Ok(ScannedBlockInfo {
height: end_height,
hash: "".to_owned(),
start_pmmr_index: pmmr_range.0,
last_pmmr_index: last_index,
})
Ok((
ScannedBlockInfo {
height: batch_end_height,
hash: "".to_owned(),
start_pmmr_index: pmmr_range.0,
last_pmmr_index: last_index,
},
total_pmmr_range,
))
}