NYM-583: Avoid corrupted database on Windows.

NYM-583: Avoid corrupted database on Windows.
This commit is contained in:
Andy Duplain
2026-05-21 14:16:03 +01:00
parent 691280797a
commit 9b285735b8
8 changed files with 185 additions and 41 deletions
Generated
+1 -1
View File
@@ -8243,7 +8243,7 @@ dependencies = [
"tempfile",
"tokio",
"tracing",
"tracing-subscriber",
"tracing-test",
"windows 0.61.3",
]
+10
View File
@@ -25,6 +25,8 @@ pub trait BandwidthTicketProvider: Send + Sync {
) -> Result<PreparedCredential, BandwidthControllerError>;
async fn get_upgrade_mode_token(&self) -> Result<Option<String>, BandwidthControllerError>;
async fn close(&self) {}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -56,6 +58,10 @@ where
.map_err(|_| BandwidthControllerError::MalformedUpgradeModeToken)?;
Ok(Some(token))
}
async fn close(&self) {
self.storage.close().await;
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -75,4 +81,8 @@ impl<T: BandwidthTicketProvider + ?Sized + Send> BandwidthTicketProvider for Box
async fn get_upgrade_mode_token(&self) -> Result<Option<String>, BandwidthControllerError> {
(**self).get_upgrade_mode_token().await
}
async fn close(&self) {
(**self).close().await;
}
}
@@ -1023,6 +1023,16 @@ where
let encryption_keys = init_res.client_keys.encryption_keypair();
let identity_keys = init_res.client_keys.identity_keypair();
let credential_store_for_close = credential_store.clone();
let close_credential_token = shutdown_tracker.clone_shutdown_token();
shutdown_tracker.try_spawn_named(
async move {
close_credential_token.cancelled().await;
credential_store_for_close.close().await;
},
"CredentialStorage::close_on_shutdown",
);
// the components are started in very specific order. Unless you know what you are doing,
// do not change that.
let bandwidth_controller = self
@@ -11,11 +11,17 @@ use nym_bandwidth_controller::BandwidthController;
use nym_client_core_gateways_storage::OnDiskGatewaysDetails;
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_validator_client::{QueryHttpRpcNyxdClient, nyxd};
use std::{io, path::Path};
use std::{io, path::Path, time::Duration};
use time::OffsetDateTime;
use tracing::{error, info, trace};
use url::Url;
/// Maximum rename retry attempts when the database file is temporarily locked.
const ARCHIVE_MAX_RETRY_ATTEMPTS: u8 = 15;
/// Delay between archive rename retry attempts.
const ARCHIVE_RETRY_DELAY: Duration = Duration::from_millis(200);
async fn setup_fresh_backend<P: AsRef<Path>>(
db_path: P,
surb_config: &config::ReplySurbs,
@@ -74,13 +80,58 @@ async fn archive_corrupted_database<P: AsRef<Path>>(db_path: P) -> io::Result<()
};
let renamed = db_path.with_extension(new_extension);
tokio::fs::rename(db_path, &renamed).await.inspect_err(|_| {
error!(
"Failed to rename corrupt database file: {} to {}",
db_path.display(),
renamed.display()
);
})
// On Windows, sqlx may release its OS file handles asynchronously after
// pool.close() returns, briefly keeping the file locked
// (ERROR_SHARING_VIOLATION, os error 32). Retry with a short delay to
// give the OS time to flush the remaining handles.
for attempt in 0..ARCHIVE_MAX_RETRY_ATTEMPTS {
match tokio::fs::rename(db_path, &renamed).await {
Ok(()) => return Ok(()),
Err(e) if is_file_locked_error(&e) && (attempt + 1) < ARCHIVE_MAX_RETRY_ATTEMPTS => {
trace!(
"Database file is temporarily locked, retrying archive \
(attempt {}/{}): {e}",
attempt + 1,
ARCHIVE_MAX_RETRY_ATTEMPTS
);
tokio::time::sleep(ARCHIVE_RETRY_DELAY).await;
}
Err(e) => {
error!(
"Failed to rename corrupt database file: {} to {}",
db_path.display(),
renamed.display()
);
return Err(e);
}
}
}
// Reached only when every attempt was blocked by a file lock.
error!(
"Failed to rename corrupt database file after {} attempts: {} to {}",
ARCHIVE_MAX_RETRY_ATTEMPTS,
db_path.display(),
renamed.display()
);
Err(io::Error::other(
"corrupt database archive blocked by persistent file lock",
))
}
/// Returns `true` when the IO error indicates a temporary file lock held by another handle
/// within the same process. Only meaningful on Windows; always `false` elsewhere.
fn is_file_locked_error(e: &io::Error) -> bool {
#[cfg(windows)]
{
// ERROR_SHARING_VIOLATION = 32, ERROR_LOCK_VIOLATION = 33
matches!(e.raw_os_error(), Some(32) | Some(33))
}
#[cfg(not(windows))]
{
let _ = e;
false
}
}
pub async fn setup_fs_reply_surb_backend<P: AsRef<Path>>(
@@ -337,6 +337,8 @@ impl ReplyStorageBackend for Backend {
}
async fn stop_storage_session(self) -> Result<(), Self::StorageError> {
self.stop_client_use().await
let result = self.stop_client_use().await;
self.shutdown().await;
result
}
}
+5 -3
View File
@@ -48,6 +48,7 @@ where
debug!("Started PersistentReplyStorage");
if let Err(err) = self.backend.start_storage_session().await {
error!("failed to start the storage session - {err}");
self.backend.stop_storage_session().await.ok();
return;
}
@@ -55,10 +56,11 @@ where
info!("PersistentReplyStorage is flushing all reply-related data to underlying storage");
if let Err(err) = self.backend.flush_surb_storage(&mem_state).await {
error!("failed to flush our reply-related data to the persistent storage: {err}")
} else {
info!("Data flush is complete")
error!("failed to flush our reply-related data to the persistent storage: {err}");
self.backend.stop_storage_session().await.ok();
return;
}
info!("Data flush is complete");
if let Err(err) = self.backend.stop_storage_session().await {
error!("failed to properly stop the storage session - {err}. We might not be able to smoothly restore it")
+1 -1
View File
@@ -41,4 +41,4 @@ windows = { version = "0.61", features = [
[dev-dependencies]
tempfile.workspace = true
tracing-subscriber.workspace = true
tracing-test.workspace = true
+96 -27
View File
@@ -3,8 +3,9 @@
use std::{
io,
ops::{Deref, DerefMut},
ops::Deref,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
@@ -26,10 +27,8 @@ const CHECK_FILES_CLOSED_MAX_ATTEMPTS: u8 = 20;
/// Delay between file checks
const CHECK_FILES_CLOSED_RETRY_DELAY: Duration = Duration::from_millis(100);
/// `sqlx::SqlitePool` wrapper providing a workaround for the [known bug](https://github.com/launchbadge/sqlx/issues/3217).
/// In principle after requesting to close the sqlite pool, the wrapper monitors open file descriptor and polls periodically until all database files are closed.
#[derive(Debug, Clone)]
pub struct SqlitePoolGuard {
#[derive(Debug)]
struct SqlitePoolGuardInner {
/// Path to sqlite database file.
database_path: PathBuf,
@@ -37,6 +36,18 @@ pub struct SqlitePoolGuard {
connection_pool: sqlx::SqlitePool,
}
/// `sqlx::SqlitePool` wrapper providing a workaround for the [known bug](https://github.com/launchbadge/sqlx/issues/3217).
/// In principle after requesting to close the sqlite pool, the wrapper monitors open file descriptor and polls periodically until all database files are closed.
///
/// This type is cheaply [`Clone`]-able: all clones share the same underlying pool and the same
/// reference count. The `Drop` impl only emits a warning when the **last** reference is dropped
/// without an explicit [`close`](Self::close) call, so it is safe to clone this guard temporarily
/// (e.g. to pass into a spawned task) without triggering spurious warnings.
#[derive(Debug, Clone)]
pub struct SqlitePoolGuard {
inner: Arc<SqlitePoolGuardInner>,
}
impl SqlitePoolGuard {
/// Create new instance providing path to database and connection pool
pub fn new(connection_pool: sqlx::SqlitePool) -> Self {
@@ -46,46 +57,70 @@ impl SqlitePoolGuard {
.to_path_buf();
Self {
database_path,
connection_pool,
inner: Arc::new(SqlitePoolGuardInner {
database_path,
connection_pool,
}),
}
}
/// Returns database path
pub fn database_path(&self) -> &Path {
&self.database_path
&self.inner.database_path
}
/// Close udnerlying sqlite pool and wait for files to be closed before returning.
/// Close the underlying sqlite pool and wait for OS file handles to be released.
///
/// **Callers must invoke this method before dropping the guard.** The `Drop` impl does
/// not perform async cleanup; it only logs a warning when the pool was not closed
/// beforehand.
pub async fn close(&self) {
// Avoid waiting for db files once the pool is marked closed to ensure that we don't wait on some other sqlite pool to close the database.
if !self.connection_pool.is_closed() {
tracing::info!("Closing sqlite pool: {}", self.database_path.display());
if !self.inner.connection_pool.is_closed() {
tracing::info!(
"Closing sqlite pool: {}",
self.inner.database_path.display()
);
self.close_pool_inner().await.ok();
}
}
async fn close_pool_inner(&self) -> std::io::Result<()> {
self.connection_pool.close().await;
self.inner.connection_pool.close().await;
self.wait_for_db_files_close().await.inspect_err(|e| {
tracing::error!("Failed to wait for file to close: {e}");
})
if let Err(e) = self.wait_for_db_files_close().await {
if e.kind() == std::io::ErrorKind::TimedOut {
tracing::warn!(
"Timed out waiting for OS file handles for sqlite database to be released; \
another connection to the same file may still be open. Path = {}",
self.inner.database_path.display()
);
} else {
tracing::warn!(
"Failed to wait for sqlite database file handles to be released: Path = {}. Error = {}",
self.inner.database_path.display(),
e
);
}
}
Ok(())
}
/// Returns all database files, including shm and wal files.
fn all_database_files(&self) -> Vec<PathBuf> {
let mut database_files = vec![];
let canonical_path = self
.inner
.database_path
.canonicalize()
.inspect_err(|e| {
tracing::error!(
"Failed to canonicalize path: {}. Cause: {e}",
self.database_path.display()
self.inner.database_path.display()
);
})
.unwrap_or(self.database_path.clone());
.unwrap_or(self.inner.database_path.clone());
if let Some(ext) = canonical_path.extension() {
for added_ext in ["-shm", "-wal"] {
@@ -120,34 +155,38 @@ impl SqlitePoolGuard {
}
}
impl Drop for SqlitePoolGuard {
fn drop(&mut self) {
if Arc::strong_count(&self.inner) == 1 && !self.inner.connection_pool.is_closed() {
tracing::warn!(
"SqlitePoolGuard dropped without explicit close(); path={}",
self.inner.database_path.display()
);
}
}
}
impl Deref for SqlitePoolGuard {
type Target = sqlx::SqlitePool;
fn deref(&self) -> &Self::Target {
&self.connection_pool
&self.inner.connection_pool
}
}
impl DerefMut for SqlitePoolGuard {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.connection_pool
}
}
#[cfg(test)]
mod tests {
use sqlx::{
ConnectOptions, Executor,
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
};
use tracing_test::traced_test;
use super::*;
#[traced_test]
#[tokio::test]
async fn test_wait_close() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.init();
let temp_dir = tempfile::tempdir().unwrap();
let database_path = temp_dir.path().join("storage.db");
@@ -177,4 +216,34 @@ mod tests {
assert!(guard.close_pool_inner().await.is_ok());
tokio::fs::remove_file(database_path).await.unwrap();
}
#[traced_test]
#[tokio::test]
async fn test_clone_drop_no_warning() {
// Cloning the guard and dropping the clone should not warn because the original is still alive.
let temp_dir = tempfile::tempdir().unwrap();
let database_path = temp_dir.path().join("storage2.db");
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path.clone())
.create_if_missing(true)
.disable_statement_logging();
let connection_pool = sqlx::SqlitePool::connect_with(opts).await.unwrap();
let guard = SqlitePoolGuard::new(connection_pool);
{
let _clone = guard.clone();
assert_eq!(Arc::strong_count(&guard.inner), 2);
}
assert_eq!(Arc::strong_count(&guard.inner), 1);
assert!(!logs_contain(
"SqlitePoolGuard dropped without explicit close"
));
guard.close().await;
tokio::fs::remove_file(database_path).await.unwrap();
}
}