Merge pull request #6812 from nymtech/cherry-pick/nym-583-corrupt-db-windows
NYM-583: Avoid corrupted database on Windows.
This commit is contained in:
Generated
+1
-1
@@ -8243,7 +8243,7 @@ dependencies = [
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"tracing-test",
|
||||
"windows 0.61.3",
|
||||
]
|
||||
|
||||
|
||||
@@ -25,6 +25,8 @@ pub trait BandwidthTicketProvider: Send + Sync {
|
||||
) -> Result<PreparedCredential, BandwidthControllerError>;
|
||||
|
||||
async fn get_upgrade_mode_token(&self) -> Result<Option<String>, BandwidthControllerError>;
|
||||
|
||||
async fn close(&self) {}
|
||||
}
|
||||
|
||||
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
|
||||
@@ -56,6 +58,10 @@ where
|
||||
.map_err(|_| BandwidthControllerError::MalformedUpgradeModeToken)?;
|
||||
Ok(Some(token))
|
||||
}
|
||||
|
||||
async fn close(&self) {
|
||||
self.storage.close().await;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
|
||||
@@ -75,4 +81,8 @@ impl<T: BandwidthTicketProvider + ?Sized + Send> BandwidthTicketProvider for Box
|
||||
async fn get_upgrade_mode_token(&self) -> Result<Option<String>, BandwidthControllerError> {
|
||||
(**self).get_upgrade_mode_token().await
|
||||
}
|
||||
|
||||
async fn close(&self) {
|
||||
(**self).close().await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1023,6 +1023,16 @@ where
|
||||
let encryption_keys = init_res.client_keys.encryption_keypair();
|
||||
let identity_keys = init_res.client_keys.identity_keypair();
|
||||
|
||||
let credential_store_for_close = credential_store.clone();
|
||||
let close_credential_token = shutdown_tracker.clone_shutdown_token();
|
||||
shutdown_tracker.try_spawn_named(
|
||||
async move {
|
||||
close_credential_token.cancelled().await;
|
||||
credential_store_for_close.close().await;
|
||||
},
|
||||
"CredentialStorage::close_on_shutdown",
|
||||
);
|
||||
|
||||
// the components are started in very specific order. Unless you know what you are doing,
|
||||
// do not change that.
|
||||
let bandwidth_controller = self
|
||||
|
||||
@@ -11,11 +11,17 @@ use nym_bandwidth_controller::BandwidthController;
|
||||
use nym_client_core_gateways_storage::OnDiskGatewaysDetails;
|
||||
use nym_credential_storage::storage::Storage as CredentialStorage;
|
||||
use nym_validator_client::{QueryHttpRpcNyxdClient, nyxd};
|
||||
use std::{io, path::Path};
|
||||
use std::{io, path::Path, time::Duration};
|
||||
use time::OffsetDateTime;
|
||||
use tracing::{error, info, trace};
|
||||
use url::Url;
|
||||
|
||||
/// Maximum rename retry attempts when the database file is temporarily locked.
|
||||
const ARCHIVE_MAX_RETRY_ATTEMPTS: u8 = 15;
|
||||
|
||||
/// Delay between archive rename retry attempts.
|
||||
const ARCHIVE_RETRY_DELAY: Duration = Duration::from_millis(200);
|
||||
|
||||
async fn setup_fresh_backend<P: AsRef<Path>>(
|
||||
db_path: P,
|
||||
surb_config: &config::ReplySurbs,
|
||||
@@ -74,13 +80,58 @@ async fn archive_corrupted_database<P: AsRef<Path>>(db_path: P) -> io::Result<()
|
||||
};
|
||||
let renamed = db_path.with_extension(new_extension);
|
||||
|
||||
tokio::fs::rename(db_path, &renamed).await.inspect_err(|_| {
|
||||
error!(
|
||||
"Failed to rename corrupt database file: {} to {}",
|
||||
db_path.display(),
|
||||
renamed.display()
|
||||
);
|
||||
})
|
||||
// On Windows, sqlx may release its OS file handles asynchronously after
|
||||
// pool.close() returns, briefly keeping the file locked
|
||||
// (ERROR_SHARING_VIOLATION, os error 32). Retry with a short delay to
|
||||
// give the OS time to flush the remaining handles.
|
||||
for attempt in 0..ARCHIVE_MAX_RETRY_ATTEMPTS {
|
||||
match tokio::fs::rename(db_path, &renamed).await {
|
||||
Ok(()) => return Ok(()),
|
||||
Err(e) if is_file_locked_error(&e) && (attempt + 1) < ARCHIVE_MAX_RETRY_ATTEMPTS => {
|
||||
trace!(
|
||||
"Database file is temporarily locked, retrying archive \
|
||||
(attempt {}/{}): {e}",
|
||||
attempt + 1,
|
||||
ARCHIVE_MAX_RETRY_ATTEMPTS
|
||||
);
|
||||
tokio::time::sleep(ARCHIVE_RETRY_DELAY).await;
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Failed to rename corrupt database file: {} to {}",
|
||||
db_path.display(),
|
||||
renamed.display()
|
||||
);
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Reached only when every attempt was blocked by a file lock.
|
||||
error!(
|
||||
"Failed to rename corrupt database file after {} attempts: {} to {}",
|
||||
ARCHIVE_MAX_RETRY_ATTEMPTS,
|
||||
db_path.display(),
|
||||
renamed.display()
|
||||
);
|
||||
Err(io::Error::other(
|
||||
"corrupt database archive blocked by persistent file lock",
|
||||
))
|
||||
}
|
||||
|
||||
/// Returns `true` when the IO error indicates a temporary file lock held by another handle
|
||||
/// within the same process. Only meaningful on Windows; always `false` elsewhere.
|
||||
fn is_file_locked_error(e: &io::Error) -> bool {
|
||||
#[cfg(windows)]
|
||||
{
|
||||
// ERROR_SHARING_VIOLATION = 32, ERROR_LOCK_VIOLATION = 33
|
||||
matches!(e.raw_os_error(), Some(32) | Some(33))
|
||||
}
|
||||
#[cfg(not(windows))]
|
||||
{
|
||||
let _ = e;
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn setup_fs_reply_surb_backend<P: AsRef<Path>>(
|
||||
|
||||
@@ -337,6 +337,8 @@ impl ReplyStorageBackend for Backend {
|
||||
}
|
||||
|
||||
async fn stop_storage_session(self) -> Result<(), Self::StorageError> {
|
||||
self.stop_client_use().await
|
||||
let result = self.stop_client_use().await;
|
||||
self.shutdown().await;
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,6 +48,7 @@ where
|
||||
debug!("Started PersistentReplyStorage");
|
||||
if let Err(err) = self.backend.start_storage_session().await {
|
||||
error!("failed to start the storage session - {err}");
|
||||
self.backend.stop_storage_session().await.ok();
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -55,10 +56,11 @@ where
|
||||
|
||||
info!("PersistentReplyStorage is flushing all reply-related data to underlying storage");
|
||||
if let Err(err) = self.backend.flush_surb_storage(&mem_state).await {
|
||||
error!("failed to flush our reply-related data to the persistent storage: {err}")
|
||||
} else {
|
||||
info!("Data flush is complete")
|
||||
error!("failed to flush our reply-related data to the persistent storage: {err}");
|
||||
self.backend.stop_storage_session().await.ok();
|
||||
return;
|
||||
}
|
||||
info!("Data flush is complete");
|
||||
|
||||
if let Err(err) = self.backend.stop_storage_session().await {
|
||||
error!("failed to properly stop the storage session - {err}. We might not be able to smoothly restore it")
|
||||
|
||||
@@ -41,4 +41,4 @@ windows = { version = "0.61", features = [
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
tracing-test.workspace = true
|
||||
|
||||
@@ -3,8 +3,9 @@
|
||||
|
||||
use std::{
|
||||
io,
|
||||
ops::{Deref, DerefMut},
|
||||
ops::Deref,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
@@ -26,10 +27,8 @@ const CHECK_FILES_CLOSED_MAX_ATTEMPTS: u8 = 20;
|
||||
/// Delay between file checks
|
||||
const CHECK_FILES_CLOSED_RETRY_DELAY: Duration = Duration::from_millis(100);
|
||||
|
||||
/// `sqlx::SqlitePool` wrapper providing a workaround for the [known bug](https://github.com/launchbadge/sqlx/issues/3217).
|
||||
/// In principle after requesting to close the sqlite pool, the wrapper monitors open file descriptor and polls periodically until all database files are closed.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SqlitePoolGuard {
|
||||
#[derive(Debug)]
|
||||
struct SqlitePoolGuardInner {
|
||||
/// Path to sqlite database file.
|
||||
database_path: PathBuf,
|
||||
|
||||
@@ -37,6 +36,18 @@ pub struct SqlitePoolGuard {
|
||||
connection_pool: sqlx::SqlitePool,
|
||||
}
|
||||
|
||||
/// `sqlx::SqlitePool` wrapper providing a workaround for the [known bug](https://github.com/launchbadge/sqlx/issues/3217).
|
||||
/// In principle after requesting to close the sqlite pool, the wrapper monitors open file descriptor and polls periodically until all database files are closed.
|
||||
///
|
||||
/// This type is cheaply [`Clone`]-able: all clones share the same underlying pool and the same
|
||||
/// reference count. The `Drop` impl only emits a warning when the **last** reference is dropped
|
||||
/// without an explicit [`close`](Self::close) call, so it is safe to clone this guard temporarily
|
||||
/// (e.g. to pass into a spawned task) without triggering spurious warnings.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SqlitePoolGuard {
|
||||
inner: Arc<SqlitePoolGuardInner>,
|
||||
}
|
||||
|
||||
impl SqlitePoolGuard {
|
||||
/// Create new instance providing path to database and connection pool
|
||||
pub fn new(connection_pool: sqlx::SqlitePool) -> Self {
|
||||
@@ -46,46 +57,70 @@ impl SqlitePoolGuard {
|
||||
.to_path_buf();
|
||||
|
||||
Self {
|
||||
database_path,
|
||||
connection_pool,
|
||||
inner: Arc::new(SqlitePoolGuardInner {
|
||||
database_path,
|
||||
connection_pool,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns database path
|
||||
pub fn database_path(&self) -> &Path {
|
||||
&self.database_path
|
||||
&self.inner.database_path
|
||||
}
|
||||
|
||||
/// Close udnerlying sqlite pool and wait for files to be closed before returning.
|
||||
/// Close the underlying sqlite pool and wait for OS file handles to be released.
|
||||
///
|
||||
/// **Callers must invoke this method before dropping the guard.** The `Drop` impl does
|
||||
/// not perform async cleanup; it only logs a warning when the pool was not closed
|
||||
/// beforehand.
|
||||
pub async fn close(&self) {
|
||||
// Avoid waiting for db files once the pool is marked closed to ensure that we don't wait on some other sqlite pool to close the database.
|
||||
if !self.connection_pool.is_closed() {
|
||||
tracing::info!("Closing sqlite pool: {}", self.database_path.display());
|
||||
if !self.inner.connection_pool.is_closed() {
|
||||
tracing::info!(
|
||||
"Closing sqlite pool: {}",
|
||||
self.inner.database_path.display()
|
||||
);
|
||||
self.close_pool_inner().await.ok();
|
||||
}
|
||||
}
|
||||
|
||||
async fn close_pool_inner(&self) -> std::io::Result<()> {
|
||||
self.connection_pool.close().await;
|
||||
self.inner.connection_pool.close().await;
|
||||
|
||||
self.wait_for_db_files_close().await.inspect_err(|e| {
|
||||
tracing::error!("Failed to wait for file to close: {e}");
|
||||
})
|
||||
if let Err(e) = self.wait_for_db_files_close().await {
|
||||
if e.kind() == std::io::ErrorKind::TimedOut {
|
||||
tracing::warn!(
|
||||
"Timed out waiting for OS file handles for sqlite database to be released; \
|
||||
another connection to the same file may still be open. Path = {}",
|
||||
self.inner.database_path.display()
|
||||
);
|
||||
} else {
|
||||
tracing::warn!(
|
||||
"Failed to wait for sqlite database file handles to be released: Path = {}. Error = {}",
|
||||
self.inner.database_path.display(),
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns all database files, including shm and wal files.
|
||||
fn all_database_files(&self) -> Vec<PathBuf> {
|
||||
let mut database_files = vec![];
|
||||
let canonical_path = self
|
||||
.inner
|
||||
.database_path
|
||||
.canonicalize()
|
||||
.inspect_err(|e| {
|
||||
tracing::error!(
|
||||
"Failed to canonicalize path: {}. Cause: {e}",
|
||||
self.database_path.display()
|
||||
self.inner.database_path.display()
|
||||
);
|
||||
})
|
||||
.unwrap_or(self.database_path.clone());
|
||||
.unwrap_or(self.inner.database_path.clone());
|
||||
|
||||
if let Some(ext) = canonical_path.extension() {
|
||||
for added_ext in ["-shm", "-wal"] {
|
||||
@@ -120,34 +155,38 @@ impl SqlitePoolGuard {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for SqlitePoolGuard {
|
||||
fn drop(&mut self) {
|
||||
if Arc::strong_count(&self.inner) == 1 && !self.inner.connection_pool.is_closed() {
|
||||
tracing::warn!(
|
||||
"SqlitePoolGuard dropped without explicit close(); path={}",
|
||||
self.inner.database_path.display()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for SqlitePoolGuard {
|
||||
type Target = sqlx::SqlitePool;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.connection_pool
|
||||
&self.inner.connection_pool
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for SqlitePoolGuard {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.connection_pool
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use sqlx::{
|
||||
ConnectOptions, Executor,
|
||||
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
|
||||
};
|
||||
use tracing_test::traced_test;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[traced_test]
|
||||
#[tokio::test]
|
||||
async fn test_wait_close() {
|
||||
tracing_subscriber::fmt()
|
||||
.with_max_level(tracing::Level::TRACE)
|
||||
.init();
|
||||
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let database_path = temp_dir.path().join("storage.db");
|
||||
|
||||
@@ -177,4 +216,34 @@ mod tests {
|
||||
assert!(guard.close_pool_inner().await.is_ok());
|
||||
tokio::fs::remove_file(database_path).await.unwrap();
|
||||
}
|
||||
|
||||
#[traced_test]
|
||||
#[tokio::test]
|
||||
async fn test_clone_drop_no_warning() {
|
||||
// Cloning the guard and dropping the clone should not warn because the original is still alive.
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let database_path = temp_dir.path().join("storage2.db");
|
||||
|
||||
let opts = sqlx::sqlite::SqliteConnectOptions::new()
|
||||
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
|
||||
.synchronous(SqliteSynchronous::Normal)
|
||||
.auto_vacuum(SqliteAutoVacuum::Incremental)
|
||||
.filename(database_path.clone())
|
||||
.create_if_missing(true)
|
||||
.disable_statement_logging();
|
||||
let connection_pool = sqlx::SqlitePool::connect_with(opts).await.unwrap();
|
||||
let guard = SqlitePoolGuard::new(connection_pool);
|
||||
|
||||
{
|
||||
let _clone = guard.clone();
|
||||
assert_eq!(Arc::strong_count(&guard.inner), 2);
|
||||
}
|
||||
assert_eq!(Arc::strong_count(&guard.inner), 1);
|
||||
assert!(!logs_contain(
|
||||
"SqlitePoolGuard dropped without explicit close"
|
||||
));
|
||||
|
||||
guard.close().await;
|
||||
tokio::fs::remove_file(database_path).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user