Compare commits
25 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| aeb3c775a0 | |||
| b92999fb77 | |||
| 8ef4c667bf | |||
| 8d2cd48da5 | |||
| f2eb97514a | |||
| 735535d902 | |||
| f7603a9973 | |||
| 693b8d5519 | |||
| f96103ab97 | |||
| b599ededf3 | |||
| d2114d3c2e | |||
| 0356f0c682 | |||
| 365b12c069 | |||
| 032281dc00 | |||
| 9cc57f8f63 | |||
| 64c940a12a | |||
| d6f0d50760 | |||
| f6f361299c | |||
| 3c5677b4ff | |||
| b243062695 | |||
| 946b10cc30 | |||
| 1c8831ec17 | |||
| 3e606be545 | |||
| 3f8c2c096b | |||
| 3ede03e1d1 |
@@ -57,5 +57,5 @@ jobs:
|
||||
|
||||
- name: BuildAndPushImageOnHarbor
|
||||
run: |
|
||||
docker build --build-arg GIT_REF=${{ github.event.inputs.gateway_probe_git_ref }} -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}
|
||||
docker build --build-arg GIT_REF=${{ github.event.inputs.gateway_probe_git_ref }} --build-arg MNEMONIC="${{ secrets.CANARY_PROBE_MNEMONIC }}" -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}
|
||||
docker push harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }} --all-tags
|
||||
|
||||
Generated
+126
-6
@@ -5258,6 +5258,7 @@ dependencies = [
|
||||
"nym-sphinx",
|
||||
"nym-task",
|
||||
"sqlx",
|
||||
"sqlx-pool-guard",
|
||||
"thiserror 2.0.12",
|
||||
"time",
|
||||
"tokio",
|
||||
@@ -5458,6 +5459,7 @@ dependencies = [
|
||||
"nym-ecash-time",
|
||||
"serde",
|
||||
"sqlx",
|
||||
"sqlx-pool-guard",
|
||||
"thiserror 2.0.12",
|
||||
"tokio",
|
||||
"zeroize",
|
||||
@@ -7896,6 +7898,15 @@ dependencies = [
|
||||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc_pidinfo"
|
||||
version = "0.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "af53dad2390f8df98dda1e4188322bdf2f91c86cf6001f51d10d64451edf463a"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prometheus"
|
||||
version = "0.14.0"
|
||||
@@ -9481,6 +9492,19 @@ dependencies = [
|
||||
"whoami",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sqlx-pool-guard"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"proc_pidinfo",
|
||||
"sqlx",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"windows 0.61.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sqlx-postgres"
|
||||
version = "0.7.4"
|
||||
@@ -11501,6 +11525,28 @@ dependencies = [
|
||||
"windows-targets 0.52.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows"
|
||||
version = "0.61.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c5ee8f3d025738cb02bad7868bbb5f8a6327501e870bf51f1b455b0a2454a419"
|
||||
dependencies = [
|
||||
"windows-collections",
|
||||
"windows-core 0.61.2",
|
||||
"windows-future",
|
||||
"windows-link",
|
||||
"windows-numerics",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-collections"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3beeceb5e5cfd9eb1d76b381630e82c4241ccd0d27f1a39ed41b2760b255c5e8"
|
||||
dependencies = [
|
||||
"windows-core 0.61.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-core"
|
||||
version = "0.52.0"
|
||||
@@ -11535,6 +11581,30 @@ dependencies = [
|
||||
"windows-targets 0.52.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-core"
|
||||
version = "0.61.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3"
|
||||
dependencies = [
|
||||
"windows-implement 0.60.0",
|
||||
"windows-interface 0.59.1",
|
||||
"windows-link",
|
||||
"windows-result 0.3.4",
|
||||
"windows-strings 0.4.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-future"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e"
|
||||
dependencies = [
|
||||
"windows-core 0.61.2",
|
||||
"windows-link",
|
||||
"windows-threading",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-implement"
|
||||
version = "0.57.0"
|
||||
@@ -11557,6 +11627,17 @@ dependencies = [
|
||||
"syn 2.0.98",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-implement"
|
||||
version = "0.60.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.98",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-interface"
|
||||
version = "0.57.0"
|
||||
@@ -11580,10 +11661,31 @@ dependencies = [
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-link"
|
||||
version = "0.1.0"
|
||||
name = "windows-interface"
|
||||
version = "0.59.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6dccfd733ce2b1753b03b6d3c65edf020262ea35e20ccdf3e288043e6dd620e3"
|
||||
checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.98",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-link"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38"
|
||||
|
||||
[[package]]
|
||||
name = "windows-numerics"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1"
|
||||
dependencies = [
|
||||
"windows-core 0.61.2",
|
||||
"windows-link",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-registry"
|
||||
@@ -11591,7 +11693,7 @@ version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4286ad90ddb45071efd1a66dfa43eb02dd0dfbae1545ad6cc3c51cf34d7e8ba3"
|
||||
dependencies = [
|
||||
"windows-result 0.3.1",
|
||||
"windows-result 0.3.4",
|
||||
"windows-strings 0.3.1",
|
||||
"windows-targets 0.53.0",
|
||||
]
|
||||
@@ -11616,9 +11718,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "windows-result"
|
||||
version = "0.3.1"
|
||||
version = "0.3.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "06374efe858fab7e4f881500e6e86ec8bc28f9462c47e5a9941a0142ad86b189"
|
||||
checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6"
|
||||
dependencies = [
|
||||
"windows-link",
|
||||
]
|
||||
@@ -11642,6 +11744,15 @@ dependencies = [
|
||||
"windows-link",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-strings"
|
||||
version = "0.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57"
|
||||
dependencies = [
|
||||
"windows-link",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-sys"
|
||||
version = "0.45.0"
|
||||
@@ -11740,6 +11851,15 @@ dependencies = [
|
||||
"windows_x86_64_msvc 0.53.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-threading"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b66463ad2e0ea3bbf808b7f1d371311c80e115c0b71d60efc142cafbcfb057a6"
|
||||
dependencies = [
|
||||
"windows-link",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows_aarch64_gnullvm"
|
||||
version = "0.42.2"
|
||||
|
||||
@@ -122,6 +122,7 @@ members = [
|
||||
"service-providers/common",
|
||||
"service-providers/ip-packet-router",
|
||||
"service-providers/network-requester",
|
||||
"sqlx-pool-guard",
|
||||
"tools/echo-server",
|
||||
"tools/internal/contract-state-importer/importer-cli",
|
||||
"tools/internal/contract-state-importer/importer-contract",
|
||||
@@ -281,6 +282,7 @@ petgraph = "0.6.5"
|
||||
pin-project = "1.1"
|
||||
pin-project-lite = "0.2.16"
|
||||
publicsuffix = "2.3.0"
|
||||
proc_pidinfo = "0.1.3"
|
||||
quote = "1"
|
||||
rand = "0.8.5"
|
||||
rand_chacha = "0.3"
|
||||
|
||||
@@ -2,8 +2,7 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::BadGateway;
|
||||
use std::io;
|
||||
use std::path::PathBuf;
|
||||
use std::{io, path::PathBuf};
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
@@ -19,7 +18,6 @@ pub enum StorageError {
|
||||
|
||||
#[error("failed to perform sqlx migration: {source}")]
|
||||
MigrationError {
|
||||
#[source]
|
||||
#[from]
|
||||
source: sqlx::migrate::MigrateError,
|
||||
},
|
||||
@@ -32,7 +30,6 @@ pub enum StorageError {
|
||||
|
||||
#[error("failed to run the SQL query: {source}")]
|
||||
QueryError {
|
||||
#[source]
|
||||
#[from]
|
||||
source: sqlx::error::Error,
|
||||
},
|
||||
|
||||
@@ -1,20 +1,18 @@
|
||||
// Copyright 2022-2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::client::replies::reply_storage::{
|
||||
fs_backend, CombinedReplyStorage, ReplyStorageBackend,
|
||||
use crate::{
|
||||
client::replies::reply_storage::{fs_backend, CombinedReplyStorage, ReplyStorageBackend},
|
||||
config,
|
||||
config::Config,
|
||||
error::ClientCoreError,
|
||||
};
|
||||
use crate::config;
|
||||
use crate::config::Config;
|
||||
use crate::error::ClientCoreError;
|
||||
use log::{error, info, trace};
|
||||
use nym_bandwidth_controller::BandwidthController;
|
||||
use nym_client_core_gateways_storage::OnDiskGatewaysDetails;
|
||||
use nym_credential_storage::storage::Storage as CredentialStorage;
|
||||
use nym_validator_client::nyxd;
|
||||
use nym_validator_client::QueryHttpRpcNyxdClient;
|
||||
use std::path::Path;
|
||||
use std::{fs, io};
|
||||
use nym_validator_client::{nyxd, QueryHttpRpcNyxdClient};
|
||||
use std::{io, path::Path};
|
||||
use time::OffsetDateTime;
|
||||
use url::Url;
|
||||
|
||||
@@ -22,11 +20,11 @@ async fn setup_fresh_backend<P: AsRef<Path>>(
|
||||
db_path: P,
|
||||
surb_config: &config::ReplySurbs,
|
||||
) -> Result<fs_backend::Backend, ClientCoreError> {
|
||||
info!("creating fresh surb database");
|
||||
info!("Creating fresh surb database");
|
||||
let mut storage_backend = match fs_backend::Backend::init(db_path).await {
|
||||
Ok(backend) => backend,
|
||||
Err(err) => {
|
||||
error!("failed to setup persistent storage backend for our reply needs: {err}");
|
||||
error!("setup_fresh_backend: Failed to setup persistent storage backend for our reply needs: {err}");
|
||||
return Err(ClientCoreError::SurbStorageError {
|
||||
source: Box::new(err),
|
||||
});
|
||||
@@ -40,14 +38,15 @@ async fn setup_fresh_backend<P: AsRef<Path>>(
|
||||
surb_config.minimum_reply_surb_storage_threshold,
|
||||
surb_config.maximum_reply_surb_storage_threshold,
|
||||
);
|
||||
storage_backend
|
||||
.init_fresh(&mem_store)
|
||||
.await
|
||||
.map_err(|err| ClientCoreError::SurbStorageError {
|
||||
source: Box::new(err),
|
||||
})?;
|
||||
|
||||
Ok(storage_backend)
|
||||
match storage_backend.init_fresh(&mem_store).await {
|
||||
Ok(()) => Ok(storage_backend),
|
||||
Err(err) => {
|
||||
storage_backend.shutdown().await;
|
||||
Err(ClientCoreError::SurbStorageError {
|
||||
source: Box::new(err),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// fn setup_inactive_backend(surb_config: &config::ReplySurbs) -> fs_backend::Backend {
|
||||
@@ -58,12 +57,11 @@ async fn setup_fresh_backend<P: AsRef<Path>>(
|
||||
// )
|
||||
// }
|
||||
|
||||
fn archive_corrupted_database<P: AsRef<Path>>(db_path: P) -> io::Result<()> {
|
||||
async fn archive_corrupted_database<P: AsRef<Path>>(db_path: P) -> io::Result<()> {
|
||||
let db_path = db_path.as_ref();
|
||||
debug_assert!(db_path.exists());
|
||||
|
||||
let now = OffsetDateTime::now_utc().unix_timestamp();
|
||||
|
||||
let suffix = format!("_{now}.corrupted");
|
||||
|
||||
let new_extension =
|
||||
@@ -72,11 +70,15 @@ fn archive_corrupted_database<P: AsRef<Path>>(db_path: P) -> io::Result<()> {
|
||||
} else {
|
||||
suffix
|
||||
};
|
||||
let renamed = db_path.with_extension(new_extension);
|
||||
|
||||
let mut renamed = db_path.to_owned();
|
||||
renamed.set_extension(new_extension);
|
||||
|
||||
fs::rename(db_path, renamed)
|
||||
tokio::fs::rename(db_path, &renamed).await.inspect_err(|_| {
|
||||
error!(
|
||||
"Failed to rename corrupt database file: {} to {}",
|
||||
db_path.display(),
|
||||
renamed.display()
|
||||
);
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn setup_fs_reply_surb_backend<P: AsRef<Path>>(
|
||||
@@ -87,13 +89,12 @@ pub async fn setup_fs_reply_surb_backend<P: AsRef<Path>>(
|
||||
// the existing one
|
||||
let db_path = db_path.as_ref();
|
||||
if db_path.exists() {
|
||||
info!("loading existing surb database");
|
||||
info!("Loading existing surb database");
|
||||
match fs_backend::Backend::try_load(db_path, surb_config.fresh_sender_tags).await {
|
||||
Ok(backend) => Ok(backend),
|
||||
Err(err) => {
|
||||
error!("failed to setup persistent storage backend for our reply needs: {err}. We're going to create a fresh database instead. This behaviour might change in the future");
|
||||
|
||||
archive_corrupted_database(db_path)?;
|
||||
error!("setup_fs_reply_surb_backend: Failed to setup persistent storage backend for our reply needs: {err}. We're going to create a fresh database instead. This behaviour might change in the future");
|
||||
archive_corrupted_database(db_path).await?;
|
||||
setup_fresh_backend(db_path, surb_config).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,15 +17,26 @@ nym-crypto = { path = "../../crypto", optional = true, default-features = false
|
||||
nym-sphinx = { path = "../../nymsphinx" }
|
||||
nym-task = { path = "../../task" }
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio]
|
||||
workspace = true
|
||||
features = ["fs"]
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.sqlx]
|
||||
workspace = true
|
||||
features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate"]
|
||||
optional = true
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.sqlx-pool-guard]
|
||||
path = "../../../sqlx-pool-guard"
|
||||
|
||||
[build-dependencies]
|
||||
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
|
||||
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate"] }
|
||||
sqlx = { workspace = true, features = [
|
||||
"runtime-tokio-rustls",
|
||||
"sqlite",
|
||||
"macros",
|
||||
"migrate",
|
||||
] }
|
||||
|
||||
[features]
|
||||
fs-surb-storage = ["sqlx", "nym-crypto", "nym-crypto/hashing"]
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use std::io;
|
||||
use std::path::PathBuf;
|
||||
use std::{io, path::PathBuf};
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
@@ -30,7 +29,6 @@ pub enum StorageError {
|
||||
|
||||
#[error("failed to perform sqlx migration: {source}")]
|
||||
MigrationError {
|
||||
#[source]
|
||||
#[from]
|
||||
source: sqlx::migrate::MigrateError,
|
||||
},
|
||||
@@ -43,7 +41,6 @@ pub enum StorageError {
|
||||
|
||||
#[error("failed to run the SQL query: {source}")]
|
||||
QueryError {
|
||||
#[source]
|
||||
#[from]
|
||||
source: sqlx::error::Error,
|
||||
},
|
||||
|
||||
@@ -15,9 +15,11 @@ use sqlx::{
|
||||
};
|
||||
use std::path::Path;
|
||||
|
||||
use sqlx_pool_guard::SqlitePoolGuard;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct StorageManager {
|
||||
pub connection_pool: sqlx::SqlitePool,
|
||||
connection_pool: SqlitePoolGuard,
|
||||
}
|
||||
|
||||
// all SQL goes here
|
||||
@@ -37,7 +39,7 @@ impl StorageManager {
|
||||
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
|
||||
.synchronous(SqliteSynchronous::Normal)
|
||||
.auto_vacuum(SqliteAutoVacuum::Incremental)
|
||||
.filename(database_path)
|
||||
.filename(&database_path)
|
||||
.create_if_missing(fresh)
|
||||
.disable_statement_logging();
|
||||
|
||||
@@ -49,11 +51,15 @@ impl StorageManager {
|
||||
}
|
||||
};
|
||||
|
||||
let connection_pool =
|
||||
SqlitePoolGuard::new(database_path.as_ref().to_path_buf(), connection_pool);
|
||||
|
||||
if let Err(err) = sqlx::migrate!("./fs_surbs_migrations")
|
||||
.run(&connection_pool)
|
||||
.run(&*connection_pool)
|
||||
.await
|
||||
{
|
||||
error!("Failed to initialize SQLx database: {err}");
|
||||
connection_pool.close().await;
|
||||
return Err(err.into());
|
||||
}
|
||||
|
||||
@@ -61,38 +67,43 @@ impl StorageManager {
|
||||
Ok(StorageManager { connection_pool })
|
||||
}
|
||||
|
||||
/// Close connection pool waiting for all connections to be closed.
|
||||
pub async fn close_pool(&self) {
|
||||
self.connection_pool.close().await;
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub async fn status_table_exists(&self) -> Result<bool, sqlx::Error> {
|
||||
sqlx::query!("SELECT name FROM sqlite_master WHERE type='table' AND name='status'")
|
||||
.fetch_optional(&self.connection_pool)
|
||||
.fetch_optional(&*self.connection_pool)
|
||||
.await
|
||||
.map(|r| r.is_some())
|
||||
}
|
||||
|
||||
pub async fn create_status_table(&self) -> Result<(), sqlx::Error> {
|
||||
sqlx::query!("INSERT INTO status(flush_in_progress, previous_flush_timestamp, client_in_use) VALUES (0, 0, 1)")
|
||||
.execute(&self.connection_pool)
|
||||
.execute(&*self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_flush_status(&self) -> Result<bool, sqlx::Error> {
|
||||
sqlx::query!("SELECT flush_in_progress FROM status;")
|
||||
.fetch_one(&self.connection_pool)
|
||||
.fetch_one(&*self.connection_pool)
|
||||
.await
|
||||
.map(|r| r.flush_in_progress > 0)
|
||||
}
|
||||
|
||||
pub async fn set_previous_flush_timestamp(&self, timestamp: i64) -> Result<(), sqlx::Error> {
|
||||
sqlx::query!("UPDATE status SET previous_flush_timestamp = ?", timestamp)
|
||||
.execute(&self.connection_pool)
|
||||
.execute(&*self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_previous_flush_timestamp(&self) -> Result<i64, sqlx::Error> {
|
||||
sqlx::query!("SELECT previous_flush_timestamp FROM status;")
|
||||
.fetch_one(&self.connection_pool)
|
||||
.fetch_one(&*self.connection_pool)
|
||||
.await
|
||||
.map(|r| r.previous_flush_timestamp)
|
||||
}
|
||||
@@ -100,14 +111,14 @@ impl StorageManager {
|
||||
pub async fn set_flush_status(&self, in_progress: bool) -> Result<(), sqlx::Error> {
|
||||
let in_progress_int = i64::from(in_progress);
|
||||
sqlx::query!("UPDATE status SET flush_in_progress = ?", in_progress_int)
|
||||
.execute(&self.connection_pool)
|
||||
.execute(&*self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_client_in_use_status(&self) -> Result<bool, sqlx::Error> {
|
||||
sqlx::query!("SELECT client_in_use FROM status;")
|
||||
.fetch_one(&self.connection_pool)
|
||||
.fetch_one(&*self.connection_pool)
|
||||
.await
|
||||
.map(|r| r.client_in_use > 0)
|
||||
}
|
||||
@@ -115,21 +126,21 @@ impl StorageManager {
|
||||
pub async fn set_client_in_use_status(&self, in_use: bool) -> Result<(), sqlx::Error> {
|
||||
let in_use_int = i64::from(in_use);
|
||||
sqlx::query!("UPDATE status SET client_in_use = ?", in_use_int)
|
||||
.execute(&self.connection_pool)
|
||||
.execute(&*self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn delete_all_tags(&self) -> Result<(), sqlx::Error> {
|
||||
sqlx::query!("DELETE FROM sender_tag;")
|
||||
.execute(&self.connection_pool)
|
||||
.execute(&*self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_tags(&self) -> Result<Vec<StoredSenderTag>, sqlx::Error> {
|
||||
sqlx::query_as!(StoredSenderTag, "SELECT * FROM sender_tag;",)
|
||||
.fetch_all(&self.connection_pool)
|
||||
.fetch_all(&*self.connection_pool)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -141,21 +152,21 @@ impl StorageManager {
|
||||
stored_tag.recipient,
|
||||
stored_tag.tag
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.execute(&*self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn delete_all_reply_keys(&self) -> Result<(), sqlx::Error> {
|
||||
sqlx::query!("DELETE FROM reply_key;")
|
||||
.execute(&self.connection_pool)
|
||||
.execute(&*self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_reply_keys(&self) -> Result<Vec<StoredReplyKey>, sqlx::Error> {
|
||||
sqlx::query_as!(StoredReplyKey, "SELECT * FROM reply_key;",)
|
||||
.fetch_all(&self.connection_pool)
|
||||
.fetch_all(&*self.connection_pool)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -171,14 +182,14 @@ impl StorageManager {
|
||||
stored_reply_key.reply_key,
|
||||
stored_reply_key.sent_at_timestamp
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.execute(&*self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_surb_senders(&self) -> Result<Vec<StoredSurbSender>, sqlx::Error> {
|
||||
sqlx::query_as!(StoredSurbSender, "SELECT * FROM reply_surb_sender;",)
|
||||
.fetch_all(&self.connection_pool)
|
||||
.fetch_all(&*self.connection_pool)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -193,7 +204,7 @@ impl StorageManager {
|
||||
stored_surb_sender.tag,
|
||||
stored_surb_sender.last_sent_timestamp
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.execute(&*self.connection_pool)
|
||||
.await?
|
||||
.last_insert_rowid();
|
||||
Ok(id)
|
||||
@@ -208,17 +219,17 @@ impl StorageManager {
|
||||
"SELECT * FROM reply_surb WHERE reply_surb_sender_id = ?",
|
||||
sender_id
|
||||
)
|
||||
.fetch_all(&self.connection_pool)
|
||||
.fetch_all(&*self.connection_pool)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn delete_all_reply_surb_data(&self) -> Result<(), sqlx::Error> {
|
||||
sqlx::query!("DELETE FROM reply_surb;")
|
||||
.execute(&self.connection_pool)
|
||||
.execute(&*self.connection_pool)
|
||||
.await?;
|
||||
|
||||
sqlx::query!("DELETE FROM reply_surb_sender;")
|
||||
.execute(&self.connection_pool)
|
||||
.execute(&*self.connection_pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
@@ -235,7 +246,7 @@ impl StorageManager {
|
||||
stored_reply_surb.reply_surb_sender_id,
|
||||
stored_reply_surb.reply_surb
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.execute(&*self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -249,7 +260,7 @@ impl StorageManager {
|
||||
SELECT min_reply_surb_threshold as "min_reply_surb_threshold: u32", max_reply_surb_threshold as "max_reply_surb_threshold: u32" FROM reply_surb_storage_metadata;
|
||||
"#,
|
||||
)
|
||||
.fetch_one(&self.connection_pool)
|
||||
.fetch_one(&*self.connection_pool)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -263,7 +274,7 @@ impl StorageManager {
|
||||
"#,
|
||||
metadata.min_reply_surb_threshold,
|
||||
metadata.max_reply_surb_threshold,
|
||||
).execute(&self.connection_pool).await?;
|
||||
).execute(&*self.connection_pool).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,18 +1,21 @@
|
||||
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::backend::fs_backend::manager::StorageManager;
|
||||
use crate::backend::fs_backend::models::{
|
||||
ReplySurbStorageMetadata, StoredReplyKey, StoredReplySurb, StoredSenderTag, StoredSurbSender,
|
||||
};
|
||||
use crate::surb_storage::ReceivedReplySurbs;
|
||||
use crate::{
|
||||
CombinedReplyStorage, ReceivedReplySurbsMap, ReplyStorageBackend, SentReplyKeys, UsedSenderTags,
|
||||
backend::fs_backend::{
|
||||
manager::StorageManager,
|
||||
models::{
|
||||
ReplySurbStorageMetadata, StoredReplyKey, StoredReplySurb, StoredSenderTag,
|
||||
StoredSurbSender,
|
||||
},
|
||||
},
|
||||
surb_storage::ReceivedReplySurbs,
|
||||
CombinedReplyStorage, ReceivedReplySurbsMap, ReplyStorageBackend, SentReplyKeys,
|
||||
UsedSenderTags,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use log::{debug, error, info, warn};
|
||||
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
|
||||
use std::fs;
|
||||
use std::path::{Path, PathBuf};
|
||||
use time::OffsetDateTime;
|
||||
|
||||
@@ -41,15 +44,17 @@ impl Backend {
|
||||
}
|
||||
|
||||
let manager = StorageManager::init(database_path, true).await?;
|
||||
manager.create_status_table().await?;
|
||||
|
||||
let backend = Backend {
|
||||
temporary_old_path: None,
|
||||
database_path: owned_path,
|
||||
manager,
|
||||
};
|
||||
|
||||
Ok(backend)
|
||||
match manager.create_status_table().await {
|
||||
Ok(()) => Ok(Backend {
|
||||
temporary_old_path: None,
|
||||
database_path: owned_path,
|
||||
manager,
|
||||
}),
|
||||
Err(err) => {
|
||||
manager.close_pool().await;
|
||||
Err(err.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn try_load<P: AsRef<Path>>(
|
||||
@@ -64,7 +69,28 @@ impl Backend {
|
||||
}
|
||||
|
||||
let manager = StorageManager::init(database_path, false).await?;
|
||||
match Self::try_load_inner(&manager, fresh_sender_tags).await {
|
||||
Ok(()) => Ok(Backend {
|
||||
temporary_old_path: None,
|
||||
database_path: owned_path,
|
||||
manager,
|
||||
}),
|
||||
Err(e) => {
|
||||
manager.close_pool().await;
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Gracefully close sqlite connection pool and drop backend.
|
||||
pub async fn shutdown(self) {
|
||||
self.manager.close_pool().await
|
||||
}
|
||||
|
||||
async fn try_load_inner(
|
||||
manager: &StorageManager,
|
||||
fresh_sender_tags: bool,
|
||||
) -> Result<(), StorageError> {
|
||||
// the database flush wasn't fully finished and thus the data is in inconsistent state
|
||||
// (we don't really know what's properly saved or what's not)
|
||||
if manager.get_flush_status().await? {
|
||||
@@ -126,20 +152,11 @@ impl Backend {
|
||||
manager.delete_all_tags().await?;
|
||||
}
|
||||
|
||||
Ok(Backend {
|
||||
temporary_old_path: None,
|
||||
database_path: owned_path,
|
||||
// manager: StorageManagerState::Storage(manager),
|
||||
manager,
|
||||
})
|
||||
}
|
||||
|
||||
async fn close_pool(&mut self) {
|
||||
self.manager.connection_pool.close().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn rotate(&mut self) -> Result<(), StorageError> {
|
||||
self.close_pool().await;
|
||||
self.manager.close_pool().await;
|
||||
|
||||
let new_extension = if let Some(existing_extension) =
|
||||
self.database_path.extension().and_then(|ext| ext.to_str())
|
||||
@@ -152,7 +169,8 @@ impl Backend {
|
||||
let mut temp_old = self.database_path.clone();
|
||||
temp_old.set_extension(new_extension);
|
||||
|
||||
fs::rename(&self.database_path, &temp_old)
|
||||
tokio::fs::rename(&self.database_path, &temp_old)
|
||||
.await
|
||||
.map_err(|err| StorageError::DatabaseRenameError { source: err })?;
|
||||
self.manager = StorageManager::init(&self.database_path, true).await?;
|
||||
self.manager.create_status_table().await?;
|
||||
@@ -161,9 +179,10 @@ impl Backend {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove_old(&mut self) -> Result<(), StorageError> {
|
||||
async fn remove_old(&mut self) -> Result<(), StorageError> {
|
||||
if let Some(old_path) = self.temporary_old_path.take() {
|
||||
fs::remove_file(old_path)
|
||||
tokio::fs::remove_file(old_path)
|
||||
.await
|
||||
.map_err(|err| StorageError::DatabaseOldFileRemoveError { source: err })
|
||||
} else {
|
||||
warn!("the old database file doesn't seem to exist!");
|
||||
@@ -335,7 +354,7 @@ impl ReplyStorageBackend for Backend {
|
||||
self.dump_reply_surb_storage_metadata(surbs_ref).await?;
|
||||
self.dump_reply_surbs(surbs_ref).await?;
|
||||
|
||||
self.remove_old()?;
|
||||
self.remove_old().await?;
|
||||
self.end_storage_flush().await
|
||||
}
|
||||
|
||||
|
||||
@@ -20,6 +20,8 @@ nym-credentials = { path = "../credentials" }
|
||||
nym-compact-ecash = { path = "../nym_offline_compact_ecash" }
|
||||
nym-ecash-time = { path = "../ecash-time" }
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.sqlx-pool-guard]
|
||||
path = "../../sqlx-pool-guard"
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.sqlx]
|
||||
workspace = true
|
||||
@@ -31,8 +33,13 @@ features = ["rt-multi-thread", "net", "signal", "fs"]
|
||||
|
||||
|
||||
[build-dependencies]
|
||||
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate"] }
|
||||
sqlx = { workspace = true, features = [
|
||||
"runtime-tokio-rustls",
|
||||
"sqlite",
|
||||
"macros",
|
||||
"migrate",
|
||||
] }
|
||||
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
|
||||
|
||||
[features]
|
||||
persistent-storage = ["bincode", "serde"]
|
||||
persistent-storage = ["bincode", "serde"]
|
||||
|
||||
@@ -7,10 +7,11 @@ use crate::models::{
|
||||
};
|
||||
use nym_ecash_time::Date;
|
||||
use sqlx::{Executor, Sqlite, Transaction};
|
||||
use sqlx_pool_guard::SqlitePoolGuard;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SqliteEcashTicketbookManager {
|
||||
connection_pool: sqlx::SqlitePool,
|
||||
connection_pool: SqlitePoolGuard,
|
||||
}
|
||||
|
||||
impl SqliteEcashTicketbookManager {
|
||||
@@ -19,7 +20,7 @@ impl SqliteEcashTicketbookManager {
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `connection_pool`: database connection pool to use.
|
||||
pub fn new(connection_pool: sqlx::SqlitePool) -> Self {
|
||||
pub fn new(connection_pool: SqlitePoolGuard) -> Self {
|
||||
SqliteEcashTicketbookManager { connection_pool }
|
||||
}
|
||||
|
||||
@@ -33,7 +34,7 @@ impl SqliteEcashTicketbookManager {
|
||||
"DELETE FROM ecash_ticketbook WHERE expiration_date <= ?",
|
||||
deadline
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.execute(&*self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -60,7 +61,7 @@ impl SqliteEcashTicketbookManager {
|
||||
data,
|
||||
expiration_date,
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.execute(&*self.connection_pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
@@ -90,7 +91,7 @@ impl SqliteEcashTicketbookManager {
|
||||
epoch_id,
|
||||
total_tickets,
|
||||
used_tickets,
|
||||
).execute(&self.connection_pool).await?;
|
||||
).execute(&*self.connection_pool).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -106,7 +107,7 @@ impl SqliteEcashTicketbookManager {
|
||||
"#,
|
||||
)
|
||||
.bind(data)
|
||||
.fetch_optional(&self.connection_pool)
|
||||
.fetch_optional(&*self.connection_pool)
|
||||
.await?
|
||||
.is_some();
|
||||
|
||||
@@ -122,7 +123,7 @@ impl SqliteEcashTicketbookManager {
|
||||
FROM ecash_ticketbook
|
||||
"#,
|
||||
)
|
||||
.fetch_all(&self.connection_pool)
|
||||
.fetch_all(&*self.connection_pool)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -144,7 +145,7 @@ impl SqliteEcashTicketbookManager {
|
||||
ticketbook_id,
|
||||
expected_current_total_spent
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.execute(&*self.connection_pool)
|
||||
.await?
|
||||
.rows_affected();
|
||||
Ok(affected > 0)
|
||||
@@ -154,7 +155,7 @@ impl SqliteEcashTicketbookManager {
|
||||
&self,
|
||||
) -> Result<Vec<StoredPendingTicketbook>, sqlx::Error> {
|
||||
sqlx::query_as("SELECT * FROM pending_issuance")
|
||||
.fetch_all(&self.connection_pool)
|
||||
.fetch_all(&*self.connection_pool)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -166,7 +167,7 @@ impl SqliteEcashTicketbookManager {
|
||||
"DELETE FROM pending_issuance WHERE deposit_id = ?",
|
||||
pending_id
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.execute(&*self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -183,7 +184,7 @@ impl SqliteEcashTicketbookManager {
|
||||
"#,
|
||||
epoch_id
|
||||
)
|
||||
.fetch_optional(&self.connection_pool)
|
||||
.fetch_optional(&*self.connection_pool)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -209,7 +210,7 @@ impl SqliteEcashTicketbookManager {
|
||||
serialisation_revision,
|
||||
epoch_id
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.execute(&*self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -226,7 +227,7 @@ impl SqliteEcashTicketbookManager {
|
||||
"#,
|
||||
epoch_id
|
||||
)
|
||||
.fetch_optional(&self.connection_pool)
|
||||
.fetch_optional(&*self.connection_pool)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -252,7 +253,7 @@ impl SqliteEcashTicketbookManager {
|
||||
serialisation_revision,
|
||||
epoch_id,
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.execute(&*self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -270,7 +271,7 @@ impl SqliteEcashTicketbookManager {
|
||||
"#,
|
||||
expiration_date
|
||||
)
|
||||
.fetch_optional(&self.connection_pool)
|
||||
.fetch_optional(&*self.connection_pool)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -299,7 +300,7 @@ impl SqliteEcashTicketbookManager {
|
||||
serialisation_revision,
|
||||
expiration_date
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.execute(&*self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -37,6 +37,7 @@ use sqlx::{
|
||||
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
|
||||
ConnectOptions,
|
||||
};
|
||||
use sqlx_pool_guard::SqlitePoolGuard;
|
||||
use std::path::Path;
|
||||
use zeroize::Zeroizing;
|
||||
|
||||
@@ -62,7 +63,7 @@ impl PersistentStorage {
|
||||
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
|
||||
.synchronous(SqliteSynchronous::Normal)
|
||||
.auto_vacuum(SqliteAutoVacuum::Incremental)
|
||||
.filename(database_path)
|
||||
.filename(&database_path)
|
||||
.create_if_missing(true)
|
||||
.disable_statement_logging();
|
||||
|
||||
@@ -74,13 +75,17 @@ impl PersistentStorage {
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(err) = sqlx::migrate!("./migrations").run(&connection_pool).await {
|
||||
let connection_pool =
|
||||
SqlitePoolGuard::new(database_path.as_ref().to_path_buf(), connection_pool);
|
||||
|
||||
if let Err(err) = sqlx::migrate!("./migrations").run(&*connection_pool).await {
|
||||
error!("Failed to perform migration on the SQLx database: {err}");
|
||||
connection_pool.close().await;
|
||||
return Err(err.into());
|
||||
}
|
||||
|
||||
Ok(PersistentStorage {
|
||||
storage_manager: SqliteEcashTicketbookManager::new(connection_pool.clone()),
|
||||
storage_manager: SqliteEcashTicketbookManager::new(connection_pool),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,8 @@
|
||||
FROM harbor.nymte.ch/dockerhub/rust:latest AS builder
|
||||
|
||||
ARG GIT_REF=main
|
||||
ARG MNEMONIC
|
||||
ENV MNEMONIC=${MNEMONIC}
|
||||
|
||||
RUN apt update && apt install -yy libdbus-1-dev pkg-config libclang-dev
|
||||
|
||||
|
||||
@@ -21,7 +21,7 @@ export NODE_STATUS_AGENT_SERVER_ADDRESS="http://127.0.0.1"
|
||||
export NODE_STATUS_AGENT_SERVER_PORT="8000"
|
||||
export NODE_STATUS_AGENT_PROBE_PATH="$crate_root/nym-gateway-probe"
|
||||
export NODE_STATUS_AGENT_AUTH_KEY="BjyC9SsHAZUzPRkQR4sPTvVrp4GgaquTh5YfSJksvvWT"
|
||||
export NODE_STATUS_AGENT_PROBE_EXTRA_ARGS="netstack-download-timeout-sec=30,netstack-num-ping=2,netstack-send-timeout-sec=1,netstack-recv-timeout-sec=1"
|
||||
export NODE_STATUS_AGENT_PROBE_EXTRA_ARGS="netstack-download-timeout-sec=30,netstack-num-ping=2,netstack-send-timeout-sec=1,netstack-recv-timeout-sec=1,mnemonic=\"${MNEMONIC}\""
|
||||
|
||||
workers=${1:-1}
|
||||
echo "Running $workers workers in parallel"
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
[package]
|
||||
name = "sqlx-pool-guard"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
tracing.workspace = true
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.sqlx]
|
||||
workspace = true
|
||||
features = ["runtime-tokio-rustls", "sqlite"]
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio]
|
||||
workspace = true
|
||||
features = ["rt-multi-thread", "macros", "time", "fs"]
|
||||
|
||||
[target.'cfg(any(target_os = "macos", target_os = "ios"))'.dependencies]
|
||||
proc_pidinfo.workspace = true
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
windows = { version = "0.61", features = [
|
||||
"Win32",
|
||||
"Win32_System",
|
||||
"Win32_System_Memory",
|
||||
"Win32_System_Threading",
|
||||
"Win32_Storage_FileSystem",
|
||||
"Wdk_System_SystemInformation",
|
||||
] }
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
@@ -0,0 +1,43 @@
|
||||
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use std::{io, path::Path};
|
||||
|
||||
use proc_pidinfo::{
|
||||
ProcFDInfo, ProcFDType, VnodeFdInfoWithPath, proc_pidfdinfo_self, proc_pidinfo_list_self,
|
||||
};
|
||||
|
||||
/// Check if there are no open file descriptors for the given files.
|
||||
///
|
||||
/// Uses `proc_pidinfo` (`sys/proc_info.h`)
|
||||
/// See: http://blog.palominolabs.com/2012/06/19/getting-the-files-being-used-by-a-process-on-mac-os-x/
|
||||
pub async fn check_files_closed(file_paths: &[&Path]) -> io::Result<bool> {
|
||||
let fd_list = proc_pidinfo_list_self::<ProcFDInfo>()?;
|
||||
|
||||
for fd in fd_list
|
||||
.iter()
|
||||
.filter(|s| s.fd_type() == Ok(ProcFDType::VNODE))
|
||||
{
|
||||
let Some(vnode) = proc_pidfdinfo_self::<VnodeFdInfoWithPath>(fd.proc_fd)
|
||||
.inspect_err(|e| {
|
||||
tracing::warn!("proc_pidfdinfo_self::<VnodeFdInfoWithPath>() failure: {e}");
|
||||
})
|
||||
.ok()
|
||||
.flatten()
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
|
||||
if let Ok(true) = vnode
|
||||
.path()
|
||||
.map(|vnode_path| file_paths.contains(&vnode_path))
|
||||
.inspect_err(|e| {
|
||||
tracing::warn!("vnode.path() failure: {e:?}");
|
||||
})
|
||||
{
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
@@ -0,0 +1,175 @@
|
||||
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use std::{
|
||||
io,
|
||||
ops::{Deref, DerefMut},
|
||||
path::{Path, PathBuf},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
#[cfg(windows)]
|
||||
#[path = "windows.rs"]
|
||||
mod imp;
|
||||
|
||||
#[cfg(any(target_os = "macos", target_os = "ios"))]
|
||||
#[path = "apple.rs"]
|
||||
mod imp;
|
||||
|
||||
#[cfg(any(target_os = "linux", target_os = "android"))]
|
||||
#[path = "linux.rs"]
|
||||
mod imp;
|
||||
|
||||
/// Max number of retry attempts
|
||||
const CHECK_FILES_CLOSED_MAX_ATTEMPTS: u8 = 20;
|
||||
|
||||
/// Delay between file checks
|
||||
const CHECK_FILES_CLOSED_RETRY_DELAY: Duration = Duration::from_millis(100);
|
||||
|
||||
/// `sqlx::SqlitePool` wrapper providing a workaround for the [known bug](https://github.com/launchbadge/sqlx/issues/3217).
|
||||
/// In principle after requesting to close the sqlite pool, the wrapper monitors open file descriptor and polls periodically until all database files are closed.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SqlitePoolGuard {
|
||||
/// Path to sqlite database file.
|
||||
database_path: PathBuf,
|
||||
|
||||
/// Inner connection pool.
|
||||
connection_pool: sqlx::SqlitePool,
|
||||
}
|
||||
|
||||
impl SqlitePoolGuard {
|
||||
/// Create new instance providing path to database and connection pool
|
||||
pub fn new(database_path: PathBuf, connection_pool: sqlx::SqlitePool) -> Self {
|
||||
Self {
|
||||
database_path,
|
||||
connection_pool,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns database path
|
||||
pub fn database_path(&self) -> &Path {
|
||||
&self.database_path
|
||||
}
|
||||
|
||||
/// Close udnerlying sqlite pool and wait for files to be closed before returning.
|
||||
pub async fn close(&self) {
|
||||
// Avoid waiting for db files once the pool is marked closed to ensure that we don't wait on some other sqlite pool to close the database.
|
||||
if !self.connection_pool.is_closed() {
|
||||
tracing::info!("Closing sqlite pool: {}", self.database_path.display());
|
||||
self.close_pool_inner().await.ok();
|
||||
}
|
||||
}
|
||||
|
||||
async fn close_pool_inner(&self) -> std::io::Result<()> {
|
||||
self.connection_pool.close().await;
|
||||
|
||||
self.wait_for_db_files_close().await.inspect_err(|e| {
|
||||
tracing::error!("Failed to wait for file to close: {e}");
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns all database files, including shm and wal files.
|
||||
fn all_database_files(&self) -> Vec<PathBuf> {
|
||||
let mut database_files = vec![];
|
||||
let canonical_path = self
|
||||
.database_path
|
||||
.canonicalize()
|
||||
.inspect_err(|e| {
|
||||
tracing::error!(
|
||||
"Failed to canonicalize path: {}. Cause: {e}",
|
||||
self.database_path.display()
|
||||
);
|
||||
})
|
||||
.unwrap_or(self.database_path.clone());
|
||||
|
||||
if let Some(ext) = canonical_path.extension() {
|
||||
for added_ext in ["-shm", "-wal"] {
|
||||
let mut new_ext = ext.to_owned();
|
||||
new_ext.push(added_ext);
|
||||
database_files.push(canonical_path.with_extension(new_ext));
|
||||
}
|
||||
}
|
||||
database_files.push(canonical_path);
|
||||
database_files
|
||||
}
|
||||
|
||||
/// Wait for database files to be closed before returning.
|
||||
async fn wait_for_db_files_close(&self) -> std::io::Result<()> {
|
||||
let database_files = self.all_database_files();
|
||||
let paths: Vec<&Path> = database_files.iter().map(PathBuf::as_path).collect();
|
||||
|
||||
for _ in 0..CHECK_FILES_CLOSED_MAX_ATTEMPTS {
|
||||
match imp::check_files_closed(&paths)
|
||||
.await
|
||||
.inspect_err(|e| tracing::error!("imp::check_files_closed() failure: {e}"))
|
||||
{
|
||||
Ok(false) | Err(_) => tokio::time::sleep(CHECK_FILES_CLOSED_RETRY_DELAY).await,
|
||||
Ok(true) => return Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::TimedOut,
|
||||
"timed out waiting for sqlite files to be closed",
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for SqlitePoolGuard {
|
||||
type Target = sqlx::SqlitePool;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.connection_pool
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for SqlitePoolGuard {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.connection_pool
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use sqlx::{
|
||||
ConnectOptions, Executor,
|
||||
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_wait_close() {
|
||||
tracing_subscriber::fmt()
|
||||
.with_max_level(tracing::Level::TRACE)
|
||||
.init();
|
||||
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let database_path = temp_dir.path().join("storage.db");
|
||||
|
||||
let opts = sqlx::sqlite::SqliteConnectOptions::new()
|
||||
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
|
||||
.synchronous(SqliteSynchronous::Normal)
|
||||
.auto_vacuum(SqliteAutoVacuum::Incremental)
|
||||
.filename(database_path.clone())
|
||||
.create_if_missing(true)
|
||||
.disable_statement_logging();
|
||||
let connection_pool = sqlx::SqlitePool::connect_with(opts).await.unwrap();
|
||||
|
||||
connection_pool
|
||||
.execute("create table test (col int)")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let guard = SqlitePoolGuard::new(database_path.clone(), connection_pool);
|
||||
assert!(
|
||||
guard
|
||||
.wait_for_db_files_close()
|
||||
.await
|
||||
.err()
|
||||
.is_some_and(|e| e.kind() == io::ErrorKind::TimedOut)
|
||||
);
|
||||
|
||||
assert!(guard.close_pool_inner().await.is_ok());
|
||||
tokio::fs::remove_file(database_path).await.unwrap();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use std::{io, path::Path};
|
||||
|
||||
static PROC_SELF_FD_DIR: &str = "/proc/self/fd/";
|
||||
|
||||
/// Check if there are no open file descriptors for the given files.
|
||||
///
|
||||
/// Linux, Android: uses `/proc/self/fd/` to list open file descriptors
|
||||
/// See: https://stackoverflow.com/a/59797198/351305
|
||||
pub async fn check_files_closed(file_paths: &[&Path]) -> io::Result<bool> {
|
||||
let mut dir = tokio::fs::read_dir(PROC_SELF_FD_DIR).await?;
|
||||
|
||||
while let Ok(Some(entry)) = dir.next_entry().await {
|
||||
if entry
|
||||
.file_type()
|
||||
.await
|
||||
.inspect_err(|e| tracing::warn!("entry.file_type() failure: {e}"))
|
||||
.is_ok_and(|entry_type| entry_type.is_symlink())
|
||||
{
|
||||
match tokio::fs::read_link(entry.path()).await {
|
||||
Ok(resolved_path) => {
|
||||
if file_paths.contains(&resolved_path.as_ref()) {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to read symlink: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
@@ -0,0 +1,188 @@
|
||||
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use std::{
|
||||
ffi::{OsString, c_uchar, c_ulong, c_ushort, c_void},
|
||||
io,
|
||||
os::windows::ffi::OsStringExt,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use windows::{
|
||||
Wdk::System::SystemInformation::{NtQuerySystemInformation, SYSTEM_INFORMATION_CLASS},
|
||||
Win32::{
|
||||
Foundation::{HANDLE, MAX_PATH, NTSTATUS, STATUS_INFO_LENGTH_MISMATCH},
|
||||
Storage::FileSystem::{
|
||||
FILE_NAME_NORMALIZED, FILE_TYPE_DISK, GetFileType, GetFinalPathNameByHandleW,
|
||||
},
|
||||
System::{
|
||||
Memory::{
|
||||
GetProcessHeap, HEAP_FLAGS, HEAP_ZERO_MEMORY, HeapAlloc, HeapFree, HeapReAlloc,
|
||||
},
|
||||
Threading::GetCurrentProcessId,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
/// Private information class used to retrieve open file handles
|
||||
const SYSTEM_HANDLE_INFORMATION_CLASS: SYSTEM_INFORMATION_CLASS = SYSTEM_INFORMATION_CLASS(0x10);
|
||||
|
||||
/// Initial buffer size holding the handle info
|
||||
/// The number is based on what I observe on a pretty standard Windows 11
|
||||
const SYSTEM_HANDLE_INFORMATION_INITIAL_SIZE: usize = 2_500_000;
|
||||
|
||||
/// Check if there are no open handles to the given files.
|
||||
///
|
||||
/// Uses undocumented NT API to obtain open handles on the system.
|
||||
/// See: https://www.ired.team/miscellaneous-reversing-forensics/windows-kernel-internals/get-all-open-handles-and-kernel-object-address-from-userland
|
||||
pub async fn check_files_closed(file_paths: &[&Path]) -> io::Result<bool> {
|
||||
let current_pid = unsafe { GetCurrentProcessId() };
|
||||
|
||||
// Allocate info struct on heap with some initial value
|
||||
let mut reserved_memory = SYSTEM_HANDLE_INFORMATION_INITIAL_SIZE;
|
||||
let mut handle_table_info = HeapGuard::<SystemHandleInformation>::new(reserved_memory)?;
|
||||
|
||||
// Request system handle information
|
||||
let mut status: NTSTATUS = NTSTATUS::default();
|
||||
let mut return_len = reserved_memory as u32;
|
||||
for _ in 0..2 {
|
||||
status = unsafe {
|
||||
NtQuerySystemInformation(
|
||||
SYSTEM_HANDLE_INFORMATION_CLASS,
|
||||
handle_table_info.as_mut_ptr() as _,
|
||||
return_len,
|
||||
&mut return_len,
|
||||
)
|
||||
};
|
||||
|
||||
// Buffer is too small, resize memory and retry again.
|
||||
if status == STATUS_INFO_LENGTH_MISMATCH {
|
||||
tracing::trace!("Buffer is too small ({reserved_memory}), resizing to {return_len}");
|
||||
reserved_memory = return_len as usize;
|
||||
handle_table_info.reallocate(reserved_memory)?;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
status.ok()?;
|
||||
|
||||
// Convert returned data into slice
|
||||
let num_handles = unsafe { (*handle_table_info.inner).number_of_handles };
|
||||
let proc_entries = unsafe {
|
||||
std::slice::from_raw_parts(
|
||||
(*handle_table_info.as_mut_ptr()).handles.as_ptr(),
|
||||
num_handles as usize,
|
||||
)
|
||||
};
|
||||
|
||||
// Iterate over open file handle entries
|
||||
for entry in proc_entries {
|
||||
if entry.unique_process_id == current_pid {
|
||||
let file_handle = HANDLE(entry.handle_value as _);
|
||||
|
||||
// Filter everything except disk files
|
||||
if unsafe { GetFileType(file_handle) } == FILE_TYPE_DISK {
|
||||
// Obtain canonical path for file handle
|
||||
let mut file_handle_path = vec![0u16; MAX_PATH as usize];
|
||||
let num_chars_without_nul = unsafe {
|
||||
GetFinalPathNameByHandleW(
|
||||
file_handle,
|
||||
&mut file_handle_path,
|
||||
FILE_NAME_NORMALIZED,
|
||||
) as usize
|
||||
};
|
||||
|
||||
if num_chars_without_nul > 0 {
|
||||
let path_str = OsString::from_wide(&file_handle_path[0..num_chars_without_nul]);
|
||||
let file_handle_pathbuf = PathBuf::from(path_str);
|
||||
if file_paths.contains(&file_handle_pathbuf.as_path()) {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Copy, Clone)]
|
||||
struct SystemHandleInformation {
|
||||
pub number_of_handles: c_ulong,
|
||||
pub handles: [SystemHandleTableEntryInfo; 1],
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Copy, Clone)]
|
||||
struct SystemHandleTableEntryInfo {
|
||||
pub unique_process_id: c_ulong,
|
||||
pub object_type_index: c_uchar,
|
||||
pub handle_attributes: c_uchar,
|
||||
pub handle_value: c_ushort,
|
||||
pub object: *mut c_void,
|
||||
pub granted_access: c_ulong,
|
||||
}
|
||||
|
||||
/// Managed heap memory
|
||||
struct HeapGuard<T> {
|
||||
inner: *mut T,
|
||||
process_heap: HANDLE,
|
||||
}
|
||||
|
||||
impl<T> HeapGuard<T> {
|
||||
/// Allocate new memory using `HealAlloc`
|
||||
fn new(length: usize) -> io::Result<Self> {
|
||||
let process_heap = unsafe { GetProcessHeap()? };
|
||||
let inner: *mut T = unsafe { HeapAlloc(process_heap, HEAP_ZERO_MEMORY, length) as _ };
|
||||
|
||||
if inner.is_null() {
|
||||
Err(io::Error::other("Failed to allocate memory"))
|
||||
} else {
|
||||
Ok(Self {
|
||||
inner,
|
||||
process_heap,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Reallocate existing chunk of memory
|
||||
///
|
||||
/// On success: the internal memory pointer is replaced.
|
||||
/// On failure: the internal memory pointer remains the same and still valid.
|
||||
fn reallocate(&mut self, new_length: usize) -> io::Result<()> {
|
||||
let new_ptr: *mut T = unsafe {
|
||||
HeapReAlloc(
|
||||
self.process_heap,
|
||||
HEAP_ZERO_MEMORY,
|
||||
Some(self.inner as _),
|
||||
new_length,
|
||||
) as _
|
||||
};
|
||||
|
||||
if new_ptr.is_null() {
|
||||
Err(io::Error::other("Failed to reallocate memory"))
|
||||
} else {
|
||||
self.inner = new_ptr;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn as_mut_ptr(&self) -> *mut T {
|
||||
self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for HeapGuard<T> {
|
||||
fn drop(&mut self) {
|
||||
#[allow(clippy::expect_used)]
|
||||
unsafe {
|
||||
HeapFree(
|
||||
self.process_heap,
|
||||
HEAP_FLAGS(0),
|
||||
Some(self.inner as *mut c_void),
|
||||
)
|
||||
}
|
||||
.expect("HeapFree failure");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user