Compare commits

...

25 Commits

Author SHA1 Message Date
benedetta davico aeb3c775a0 Update run.sh 2025-07-02 18:14:30 +02:00
benedettadavico b92999fb77 adding other bits to mnemonic arg 2025-07-02 17:57:30 +02:00
benedettadavico 8ef4c667bf add mnemonic 2025-07-02 17:29:59 +02:00
Andrej Mihajlov 8d2cd48da5 test: remove file after closing for a test 2025-06-12 15:39:52 +02:00
Andrej Mihajlov f2eb97514a Fix missing await on self.close_pool_inner() 2025-06-12 15:12:29 +02:00
Andrej Mihajlov 735535d902 Hide tokio behind feature 2025-06-06 15:21:43 +02:00
Andrej Mihajlov f7603a9973 Hide tokio and sqlx behind not(wasm32) 2025-06-06 15:21:43 +02:00
Andrej Mihajlov 693b8d5519 Exclude sqlx-pool-guard from wasm builds 2025-06-06 15:21:43 +02:00
Andrej Mihajlov f96103ab97 Log all tracing output just in case 2025-06-04 11:41:09 +02:00
Andrej Mihajlov b599ededf3 Include proc_pidinfo on iOS 2025-06-04 10:58:08 +02:00
Andrej Mihajlov d2114d3c2e Switch to tracing 2025-06-03 17:27:22 +02:00
Andrej Mihajlov 0356f0c682 Use log here 2025-06-03 15:12:55 +02:00
Andrej Mihajlov 365b12c069 Expose database path 2025-06-03 14:49:35 +02:00
Andrej Mihajlov 032281dc00 Clean up 2025-06-03 10:09:11 +02:00
Andrej Mihajlov 9cc57f8f63 Improve windows 2025-06-03 10:09:11 +02:00
Andrej Mihajlov 64c940a12a Cleanup 2025-06-03 10:09:11 +02:00
Andrej Mihajlov d6f0d50760 Revert 2025-06-03 10:09:11 +02:00
Andrej Mihajlov f6f361299c Use sqlite pool guard 2025-06-03 10:09:11 +02:00
Andrej Mihajlov 3c5677b4ff Remove logs 2025-06-03 10:09:11 +02:00
Andrej Mihajlov b243062695 Document 2025-06-03 10:09:11 +02:00
Andrej Mihajlov 946b10cc30 Add Windows impl 2025-06-03 10:09:11 +02:00
Andrej Mihajlov 1c8831ec17 Add Windows implementation 2025-06-03 10:09:11 +02:00
Andrej Mihajlov 3e606be545 Update Linux impl 2025-06-03 10:09:11 +02:00
Andrej Mihajlov 3f8c2c096b Open file watch 2025-06-03 10:09:11 +02:00
Andrej Mihajlov 3ede03e1d1 Close sqlite pool before erroring 2025-06-03 10:09:11 +02:00
19 changed files with 774 additions and 123 deletions
@@ -57,5 +57,5 @@ jobs:
- name: BuildAndPushImageOnHarbor
run: |
docker build --build-arg GIT_REF=${{ github.event.inputs.gateway_probe_git_ref }} -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}
docker build --build-arg GIT_REF=${{ github.event.inputs.gateway_probe_git_ref }} --build-arg MNEMONIC="${{ secrets.CANARY_PROBE_MNEMONIC }}" -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}
docker push harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }} --all-tags
Generated
+126 -6
View File
@@ -5258,6 +5258,7 @@ dependencies = [
"nym-sphinx",
"nym-task",
"sqlx",
"sqlx-pool-guard",
"thiserror 2.0.12",
"time",
"tokio",
@@ -5458,6 +5459,7 @@ dependencies = [
"nym-ecash-time",
"serde",
"sqlx",
"sqlx-pool-guard",
"thiserror 2.0.12",
"tokio",
"zeroize",
@@ -7896,6 +7898,15 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "proc_pidinfo"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af53dad2390f8df98dda1e4188322bdf2f91c86cf6001f51d10d64451edf463a"
dependencies = [
"libc",
]
[[package]]
name = "prometheus"
version = "0.14.0"
@@ -9481,6 +9492,19 @@ dependencies = [
"whoami",
]
[[package]]
name = "sqlx-pool-guard"
version = "0.1.0"
dependencies = [
"proc_pidinfo",
"sqlx",
"tempfile",
"tokio",
"tracing",
"tracing-subscriber",
"windows 0.61.1",
]
[[package]]
name = "sqlx-postgres"
version = "0.7.4"
@@ -11501,6 +11525,28 @@ dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows"
version = "0.61.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5ee8f3d025738cb02bad7868bbb5f8a6327501e870bf51f1b455b0a2454a419"
dependencies = [
"windows-collections",
"windows-core 0.61.2",
"windows-future",
"windows-link",
"windows-numerics",
]
[[package]]
name = "windows-collections"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3beeceb5e5cfd9eb1d76b381630e82c4241ccd0d27f1a39ed41b2760b255c5e8"
dependencies = [
"windows-core 0.61.2",
]
[[package]]
name = "windows-core"
version = "0.52.0"
@@ -11535,6 +11581,30 @@ dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows-core"
version = "0.61.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3"
dependencies = [
"windows-implement 0.60.0",
"windows-interface 0.59.1",
"windows-link",
"windows-result 0.3.4",
"windows-strings 0.4.2",
]
[[package]]
name = "windows-future"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e"
dependencies = [
"windows-core 0.61.2",
"windows-link",
"windows-threading",
]
[[package]]
name = "windows-implement"
version = "0.57.0"
@@ -11557,6 +11627,17 @@ dependencies = [
"syn 2.0.98",
]
[[package]]
name = "windows-implement"
version = "0.60.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.98",
]
[[package]]
name = "windows-interface"
version = "0.57.0"
@@ -11580,10 +11661,31 @@ dependencies = [
]
[[package]]
name = "windows-link"
version = "0.1.0"
name = "windows-interface"
version = "0.59.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6dccfd733ce2b1753b03b6d3c65edf020262ea35e20ccdf3e288043e6dd620e3"
checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.98",
]
[[package]]
name = "windows-link"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38"
[[package]]
name = "windows-numerics"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1"
dependencies = [
"windows-core 0.61.2",
"windows-link",
]
[[package]]
name = "windows-registry"
@@ -11591,7 +11693,7 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4286ad90ddb45071efd1a66dfa43eb02dd0dfbae1545ad6cc3c51cf34d7e8ba3"
dependencies = [
"windows-result 0.3.1",
"windows-result 0.3.4",
"windows-strings 0.3.1",
"windows-targets 0.53.0",
]
@@ -11616,9 +11718,9 @@ dependencies = [
[[package]]
name = "windows-result"
version = "0.3.1"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06374efe858fab7e4f881500e6e86ec8bc28f9462c47e5a9941a0142ad86b189"
checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6"
dependencies = [
"windows-link",
]
@@ -11642,6 +11744,15 @@ dependencies = [
"windows-link",
]
[[package]]
name = "windows-strings"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57"
dependencies = [
"windows-link",
]
[[package]]
name = "windows-sys"
version = "0.45.0"
@@ -11740,6 +11851,15 @@ dependencies = [
"windows_x86_64_msvc 0.53.0",
]
[[package]]
name = "windows-threading"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b66463ad2e0ea3bbf808b7f1d371311c80e115c0b71d60efc142cafbcfb057a6"
dependencies = [
"windows-link",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.42.2"
+2
View File
@@ -122,6 +122,7 @@ members = [
"service-providers/common",
"service-providers/ip-packet-router",
"service-providers/network-requester",
"sqlx-pool-guard",
"tools/echo-server",
"tools/internal/contract-state-importer/importer-cli",
"tools/internal/contract-state-importer/importer-contract",
@@ -281,6 +282,7 @@ petgraph = "0.6.5"
pin-project = "1.1"
pin-project-lite = "0.2.16"
publicsuffix = "2.3.0"
proc_pidinfo = "0.1.3"
quote = "1"
rand = "0.8.5"
rand_chacha = "0.3"
@@ -2,8 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::BadGateway;
use std::io;
use std::path::PathBuf;
use std::{io, path::PathBuf};
use thiserror::Error;
#[derive(Debug, Error)]
@@ -19,7 +18,6 @@ pub enum StorageError {
#[error("failed to perform sqlx migration: {source}")]
MigrationError {
#[source]
#[from]
source: sqlx::migrate::MigrateError,
},
@@ -32,7 +30,6 @@ pub enum StorageError {
#[error("failed to run the SQL query: {source}")]
QueryError {
#[source]
#[from]
source: sqlx::error::Error,
},
@@ -1,20 +1,18 @@
// Copyright 2022-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::replies::reply_storage::{
fs_backend, CombinedReplyStorage, ReplyStorageBackend,
use crate::{
client::replies::reply_storage::{fs_backend, CombinedReplyStorage, ReplyStorageBackend},
config,
config::Config,
error::ClientCoreError,
};
use crate::config;
use crate::config::Config;
use crate::error::ClientCoreError;
use log::{error, info, trace};
use nym_bandwidth_controller::BandwidthController;
use nym_client_core_gateways_storage::OnDiskGatewaysDetails;
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_validator_client::nyxd;
use nym_validator_client::QueryHttpRpcNyxdClient;
use std::path::Path;
use std::{fs, io};
use nym_validator_client::{nyxd, QueryHttpRpcNyxdClient};
use std::{io, path::Path};
use time::OffsetDateTime;
use url::Url;
@@ -22,11 +20,11 @@ async fn setup_fresh_backend<P: AsRef<Path>>(
db_path: P,
surb_config: &config::ReplySurbs,
) -> Result<fs_backend::Backend, ClientCoreError> {
info!("creating fresh surb database");
info!("Creating fresh surb database");
let mut storage_backend = match fs_backend::Backend::init(db_path).await {
Ok(backend) => backend,
Err(err) => {
error!("failed to setup persistent storage backend for our reply needs: {err}");
error!("setup_fresh_backend: Failed to setup persistent storage backend for our reply needs: {err}");
return Err(ClientCoreError::SurbStorageError {
source: Box::new(err),
});
@@ -40,14 +38,15 @@ async fn setup_fresh_backend<P: AsRef<Path>>(
surb_config.minimum_reply_surb_storage_threshold,
surb_config.maximum_reply_surb_storage_threshold,
);
storage_backend
.init_fresh(&mem_store)
.await
.map_err(|err| ClientCoreError::SurbStorageError {
source: Box::new(err),
})?;
Ok(storage_backend)
match storage_backend.init_fresh(&mem_store).await {
Ok(()) => Ok(storage_backend),
Err(err) => {
storage_backend.shutdown().await;
Err(ClientCoreError::SurbStorageError {
source: Box::new(err),
})
}
}
}
// fn setup_inactive_backend(surb_config: &config::ReplySurbs) -> fs_backend::Backend {
@@ -58,12 +57,11 @@ async fn setup_fresh_backend<P: AsRef<Path>>(
// )
// }
fn archive_corrupted_database<P: AsRef<Path>>(db_path: P) -> io::Result<()> {
async fn archive_corrupted_database<P: AsRef<Path>>(db_path: P) -> io::Result<()> {
let db_path = db_path.as_ref();
debug_assert!(db_path.exists());
let now = OffsetDateTime::now_utc().unix_timestamp();
let suffix = format!("_{now}.corrupted");
let new_extension =
@@ -72,11 +70,15 @@ fn archive_corrupted_database<P: AsRef<Path>>(db_path: P) -> io::Result<()> {
} else {
suffix
};
let renamed = db_path.with_extension(new_extension);
let mut renamed = db_path.to_owned();
renamed.set_extension(new_extension);
fs::rename(db_path, renamed)
tokio::fs::rename(db_path, &renamed).await.inspect_err(|_| {
error!(
"Failed to rename corrupt database file: {} to {}",
db_path.display(),
renamed.display()
);
})
}
pub async fn setup_fs_reply_surb_backend<P: AsRef<Path>>(
@@ -87,13 +89,12 @@ pub async fn setup_fs_reply_surb_backend<P: AsRef<Path>>(
// the existing one
let db_path = db_path.as_ref();
if db_path.exists() {
info!("loading existing surb database");
info!("Loading existing surb database");
match fs_backend::Backend::try_load(db_path, surb_config.fresh_sender_tags).await {
Ok(backend) => Ok(backend),
Err(err) => {
error!("failed to setup persistent storage backend for our reply needs: {err}. We're going to create a fresh database instead. This behaviour might change in the future");
archive_corrupted_database(db_path)?;
error!("setup_fs_reply_surb_backend: Failed to setup persistent storage backend for our reply needs: {err}. We're going to create a fresh database instead. This behaviour might change in the future");
archive_corrupted_database(db_path).await?;
setup_fresh_backend(db_path, surb_config).await
}
}
+12 -1
View File
@@ -17,15 +17,26 @@ nym-crypto = { path = "../../crypto", optional = true, default-features = false
nym-sphinx = { path = "../../nymsphinx" }
nym-task = { path = "../../task" }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio]
workspace = true
features = ["fs"]
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.sqlx]
workspace = true
features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate"]
optional = true
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.sqlx-pool-guard]
path = "../../../sqlx-pool-guard"
[build-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate"] }
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
"sqlite",
"macros",
"migrate",
] }
[features]
fs-surb-storage = ["sqlx", "nym-crypto", "nym-crypto/hashing"]
@@ -1,8 +1,7 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::io;
use std::path::PathBuf;
use std::{io, path::PathBuf};
use thiserror::Error;
#[derive(Debug, Error)]
@@ -30,7 +29,6 @@ pub enum StorageError {
#[error("failed to perform sqlx migration: {source}")]
MigrationError {
#[source]
#[from]
source: sqlx::migrate::MigrateError,
},
@@ -43,7 +41,6 @@ pub enum StorageError {
#[error("failed to run the SQL query: {source}")]
QueryError {
#[source]
#[from]
source: sqlx::error::Error,
},
@@ -15,9 +15,11 @@ use sqlx::{
};
use std::path::Path;
use sqlx_pool_guard::SqlitePoolGuard;
#[derive(Debug, Clone)]
pub struct StorageManager {
pub connection_pool: sqlx::SqlitePool,
connection_pool: SqlitePoolGuard,
}
// all SQL goes here
@@ -37,7 +39,7 @@ impl StorageManager {
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.filename(&database_path)
.create_if_missing(fresh)
.disable_statement_logging();
@@ -49,11 +51,15 @@ impl StorageManager {
}
};
let connection_pool =
SqlitePoolGuard::new(database_path.as_ref().to_path_buf(), connection_pool);
if let Err(err) = sqlx::migrate!("./fs_surbs_migrations")
.run(&connection_pool)
.run(&*connection_pool)
.await
{
error!("Failed to initialize SQLx database: {err}");
connection_pool.close().await;
return Err(err.into());
}
@@ -61,38 +67,43 @@ impl StorageManager {
Ok(StorageManager { connection_pool })
}
/// Close connection pool waiting for all connections to be closed.
pub async fn close_pool(&self) {
self.connection_pool.close().await;
}
#[allow(dead_code)]
pub async fn status_table_exists(&self) -> Result<bool, sqlx::Error> {
sqlx::query!("SELECT name FROM sqlite_master WHERE type='table' AND name='status'")
.fetch_optional(&self.connection_pool)
.fetch_optional(&*self.connection_pool)
.await
.map(|r| r.is_some())
}
pub async fn create_status_table(&self) -> Result<(), sqlx::Error> {
sqlx::query!("INSERT INTO status(flush_in_progress, previous_flush_timestamp, client_in_use) VALUES (0, 0, 1)")
.execute(&self.connection_pool)
.execute(&*self.connection_pool)
.await?;
Ok(())
}
pub async fn get_flush_status(&self) -> Result<bool, sqlx::Error> {
sqlx::query!("SELECT flush_in_progress FROM status;")
.fetch_one(&self.connection_pool)
.fetch_one(&*self.connection_pool)
.await
.map(|r| r.flush_in_progress > 0)
}
pub async fn set_previous_flush_timestamp(&self, timestamp: i64) -> Result<(), sqlx::Error> {
sqlx::query!("UPDATE status SET previous_flush_timestamp = ?", timestamp)
.execute(&self.connection_pool)
.execute(&*self.connection_pool)
.await?;
Ok(())
}
pub async fn get_previous_flush_timestamp(&self) -> Result<i64, sqlx::Error> {
sqlx::query!("SELECT previous_flush_timestamp FROM status;")
.fetch_one(&self.connection_pool)
.fetch_one(&*self.connection_pool)
.await
.map(|r| r.previous_flush_timestamp)
}
@@ -100,14 +111,14 @@ impl StorageManager {
pub async fn set_flush_status(&self, in_progress: bool) -> Result<(), sqlx::Error> {
let in_progress_int = i64::from(in_progress);
sqlx::query!("UPDATE status SET flush_in_progress = ?", in_progress_int)
.execute(&self.connection_pool)
.execute(&*self.connection_pool)
.await?;
Ok(())
}
pub async fn get_client_in_use_status(&self) -> Result<bool, sqlx::Error> {
sqlx::query!("SELECT client_in_use FROM status;")
.fetch_one(&self.connection_pool)
.fetch_one(&*self.connection_pool)
.await
.map(|r| r.client_in_use > 0)
}
@@ -115,21 +126,21 @@ impl StorageManager {
pub async fn set_client_in_use_status(&self, in_use: bool) -> Result<(), sqlx::Error> {
let in_use_int = i64::from(in_use);
sqlx::query!("UPDATE status SET client_in_use = ?", in_use_int)
.execute(&self.connection_pool)
.execute(&*self.connection_pool)
.await?;
Ok(())
}
pub async fn delete_all_tags(&self) -> Result<(), sqlx::Error> {
sqlx::query!("DELETE FROM sender_tag;")
.execute(&self.connection_pool)
.execute(&*self.connection_pool)
.await?;
Ok(())
}
pub async fn get_tags(&self) -> Result<Vec<StoredSenderTag>, sqlx::Error> {
sqlx::query_as!(StoredSenderTag, "SELECT * FROM sender_tag;",)
.fetch_all(&self.connection_pool)
.fetch_all(&*self.connection_pool)
.await
}
@@ -141,21 +152,21 @@ impl StorageManager {
stored_tag.recipient,
stored_tag.tag
)
.execute(&self.connection_pool)
.execute(&*self.connection_pool)
.await?;
Ok(())
}
pub async fn delete_all_reply_keys(&self) -> Result<(), sqlx::Error> {
sqlx::query!("DELETE FROM reply_key;")
.execute(&self.connection_pool)
.execute(&*self.connection_pool)
.await?;
Ok(())
}
pub async fn get_reply_keys(&self) -> Result<Vec<StoredReplyKey>, sqlx::Error> {
sqlx::query_as!(StoredReplyKey, "SELECT * FROM reply_key;",)
.fetch_all(&self.connection_pool)
.fetch_all(&*self.connection_pool)
.await
}
@@ -171,14 +182,14 @@ impl StorageManager {
stored_reply_key.reply_key,
stored_reply_key.sent_at_timestamp
)
.execute(&self.connection_pool)
.execute(&*self.connection_pool)
.await?;
Ok(())
}
pub async fn get_surb_senders(&self) -> Result<Vec<StoredSurbSender>, sqlx::Error> {
sqlx::query_as!(StoredSurbSender, "SELECT * FROM reply_surb_sender;",)
.fetch_all(&self.connection_pool)
.fetch_all(&*self.connection_pool)
.await
}
@@ -193,7 +204,7 @@ impl StorageManager {
stored_surb_sender.tag,
stored_surb_sender.last_sent_timestamp
)
.execute(&self.connection_pool)
.execute(&*self.connection_pool)
.await?
.last_insert_rowid();
Ok(id)
@@ -208,17 +219,17 @@ impl StorageManager {
"SELECT * FROM reply_surb WHERE reply_surb_sender_id = ?",
sender_id
)
.fetch_all(&self.connection_pool)
.fetch_all(&*self.connection_pool)
.await
}
pub async fn delete_all_reply_surb_data(&self) -> Result<(), sqlx::Error> {
sqlx::query!("DELETE FROM reply_surb;")
.execute(&self.connection_pool)
.execute(&*self.connection_pool)
.await?;
sqlx::query!("DELETE FROM reply_surb_sender;")
.execute(&self.connection_pool)
.execute(&*self.connection_pool)
.await?;
Ok(())
@@ -235,7 +246,7 @@ impl StorageManager {
stored_reply_surb.reply_surb_sender_id,
stored_reply_surb.reply_surb
)
.execute(&self.connection_pool)
.execute(&*self.connection_pool)
.await?;
Ok(())
}
@@ -249,7 +260,7 @@ impl StorageManager {
SELECT min_reply_surb_threshold as "min_reply_surb_threshold: u32", max_reply_surb_threshold as "max_reply_surb_threshold: u32" FROM reply_surb_storage_metadata;
"#,
)
.fetch_one(&self.connection_pool)
.fetch_one(&*self.connection_pool)
.await
}
@@ -263,7 +274,7 @@ impl StorageManager {
"#,
metadata.min_reply_surb_threshold,
metadata.max_reply_surb_threshold,
).execute(&self.connection_pool).await?;
).execute(&*self.connection_pool).await?;
Ok(())
}
}
@@ -1,18 +1,21 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::backend::fs_backend::manager::StorageManager;
use crate::backend::fs_backend::models::{
ReplySurbStorageMetadata, StoredReplyKey, StoredReplySurb, StoredSenderTag, StoredSurbSender,
};
use crate::surb_storage::ReceivedReplySurbs;
use crate::{
CombinedReplyStorage, ReceivedReplySurbsMap, ReplyStorageBackend, SentReplyKeys, UsedSenderTags,
backend::fs_backend::{
manager::StorageManager,
models::{
ReplySurbStorageMetadata, StoredReplyKey, StoredReplySurb, StoredSenderTag,
StoredSurbSender,
},
},
surb_storage::ReceivedReplySurbs,
CombinedReplyStorage, ReceivedReplySurbsMap, ReplyStorageBackend, SentReplyKeys,
UsedSenderTags,
};
use async_trait::async_trait;
use log::{debug, error, info, warn};
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use std::fs;
use std::path::{Path, PathBuf};
use time::OffsetDateTime;
@@ -41,15 +44,17 @@ impl Backend {
}
let manager = StorageManager::init(database_path, true).await?;
manager.create_status_table().await?;
let backend = Backend {
temporary_old_path: None,
database_path: owned_path,
manager,
};
Ok(backend)
match manager.create_status_table().await {
Ok(()) => Ok(Backend {
temporary_old_path: None,
database_path: owned_path,
manager,
}),
Err(err) => {
manager.close_pool().await;
Err(err.into())
}
}
}
pub async fn try_load<P: AsRef<Path>>(
@@ -64,7 +69,28 @@ impl Backend {
}
let manager = StorageManager::init(database_path, false).await?;
match Self::try_load_inner(&manager, fresh_sender_tags).await {
Ok(()) => Ok(Backend {
temporary_old_path: None,
database_path: owned_path,
manager,
}),
Err(e) => {
manager.close_pool().await;
Err(e)
}
}
}
/// Gracefully close sqlite connection pool and drop backend.
pub async fn shutdown(self) {
self.manager.close_pool().await
}
async fn try_load_inner(
manager: &StorageManager,
fresh_sender_tags: bool,
) -> Result<(), StorageError> {
// the database flush wasn't fully finished and thus the data is in inconsistent state
// (we don't really know what's properly saved or what's not)
if manager.get_flush_status().await? {
@@ -126,20 +152,11 @@ impl Backend {
manager.delete_all_tags().await?;
}
Ok(Backend {
temporary_old_path: None,
database_path: owned_path,
// manager: StorageManagerState::Storage(manager),
manager,
})
}
async fn close_pool(&mut self) {
self.manager.connection_pool.close().await;
Ok(())
}
async fn rotate(&mut self) -> Result<(), StorageError> {
self.close_pool().await;
self.manager.close_pool().await;
let new_extension = if let Some(existing_extension) =
self.database_path.extension().and_then(|ext| ext.to_str())
@@ -152,7 +169,8 @@ impl Backend {
let mut temp_old = self.database_path.clone();
temp_old.set_extension(new_extension);
fs::rename(&self.database_path, &temp_old)
tokio::fs::rename(&self.database_path, &temp_old)
.await
.map_err(|err| StorageError::DatabaseRenameError { source: err })?;
self.manager = StorageManager::init(&self.database_path, true).await?;
self.manager.create_status_table().await?;
@@ -161,9 +179,10 @@ impl Backend {
Ok(())
}
fn remove_old(&mut self) -> Result<(), StorageError> {
async fn remove_old(&mut self) -> Result<(), StorageError> {
if let Some(old_path) = self.temporary_old_path.take() {
fs::remove_file(old_path)
tokio::fs::remove_file(old_path)
.await
.map_err(|err| StorageError::DatabaseOldFileRemoveError { source: err })
} else {
warn!("the old database file doesn't seem to exist!");
@@ -335,7 +354,7 @@ impl ReplyStorageBackend for Backend {
self.dump_reply_surb_storage_metadata(surbs_ref).await?;
self.dump_reply_surbs(surbs_ref).await?;
self.remove_old()?;
self.remove_old().await?;
self.end_storage_flush().await
}
+9 -2
View File
@@ -20,6 +20,8 @@ nym-credentials = { path = "../credentials" }
nym-compact-ecash = { path = "../nym_offline_compact_ecash" }
nym-ecash-time = { path = "../ecash-time" }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.sqlx-pool-guard]
path = "../../sqlx-pool-guard"
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.sqlx]
workspace = true
@@ -31,8 +33,13 @@ features = ["rt-multi-thread", "net", "signal", "fs"]
[build-dependencies]
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate"] }
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
"sqlite",
"macros",
"migrate",
] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
[features]
persistent-storage = ["bincode", "serde"]
persistent-storage = ["bincode", "serde"]
@@ -7,10 +7,11 @@ use crate::models::{
};
use nym_ecash_time::Date;
use sqlx::{Executor, Sqlite, Transaction};
use sqlx_pool_guard::SqlitePoolGuard;
#[derive(Clone)]
pub struct SqliteEcashTicketbookManager {
connection_pool: sqlx::SqlitePool,
connection_pool: SqlitePoolGuard,
}
impl SqliteEcashTicketbookManager {
@@ -19,7 +20,7 @@ impl SqliteEcashTicketbookManager {
/// # Arguments
///
/// * `connection_pool`: database connection pool to use.
pub fn new(connection_pool: sqlx::SqlitePool) -> Self {
pub fn new(connection_pool: SqlitePoolGuard) -> Self {
SqliteEcashTicketbookManager { connection_pool }
}
@@ -33,7 +34,7 @@ impl SqliteEcashTicketbookManager {
"DELETE FROM ecash_ticketbook WHERE expiration_date <= ?",
deadline
)
.execute(&self.connection_pool)
.execute(&*self.connection_pool)
.await?;
Ok(())
}
@@ -60,7 +61,7 @@ impl SqliteEcashTicketbookManager {
data,
expiration_date,
)
.execute(&self.connection_pool)
.execute(&*self.connection_pool)
.await?;
Ok(())
@@ -90,7 +91,7 @@ impl SqliteEcashTicketbookManager {
epoch_id,
total_tickets,
used_tickets,
).execute(&self.connection_pool).await?;
).execute(&*self.connection_pool).await?;
Ok(())
}
@@ -106,7 +107,7 @@ impl SqliteEcashTicketbookManager {
"#,
)
.bind(data)
.fetch_optional(&self.connection_pool)
.fetch_optional(&*self.connection_pool)
.await?
.is_some();
@@ -122,7 +123,7 @@ impl SqliteEcashTicketbookManager {
FROM ecash_ticketbook
"#,
)
.fetch_all(&self.connection_pool)
.fetch_all(&*self.connection_pool)
.await
}
@@ -144,7 +145,7 @@ impl SqliteEcashTicketbookManager {
ticketbook_id,
expected_current_total_spent
)
.execute(&self.connection_pool)
.execute(&*self.connection_pool)
.await?
.rows_affected();
Ok(affected > 0)
@@ -154,7 +155,7 @@ impl SqliteEcashTicketbookManager {
&self,
) -> Result<Vec<StoredPendingTicketbook>, sqlx::Error> {
sqlx::query_as("SELECT * FROM pending_issuance")
.fetch_all(&self.connection_pool)
.fetch_all(&*self.connection_pool)
.await
}
@@ -166,7 +167,7 @@ impl SqliteEcashTicketbookManager {
"DELETE FROM pending_issuance WHERE deposit_id = ?",
pending_id
)
.execute(&self.connection_pool)
.execute(&*self.connection_pool)
.await?;
Ok(())
}
@@ -183,7 +184,7 @@ impl SqliteEcashTicketbookManager {
"#,
epoch_id
)
.fetch_optional(&self.connection_pool)
.fetch_optional(&*self.connection_pool)
.await
}
@@ -209,7 +210,7 @@ impl SqliteEcashTicketbookManager {
serialisation_revision,
epoch_id
)
.execute(&self.connection_pool)
.execute(&*self.connection_pool)
.await?;
Ok(())
}
@@ -226,7 +227,7 @@ impl SqliteEcashTicketbookManager {
"#,
epoch_id
)
.fetch_optional(&self.connection_pool)
.fetch_optional(&*self.connection_pool)
.await
}
@@ -252,7 +253,7 @@ impl SqliteEcashTicketbookManager {
serialisation_revision,
epoch_id,
)
.execute(&self.connection_pool)
.execute(&*self.connection_pool)
.await?;
Ok(())
}
@@ -270,7 +271,7 @@ impl SqliteEcashTicketbookManager {
"#,
expiration_date
)
.fetch_optional(&self.connection_pool)
.fetch_optional(&*self.connection_pool)
.await
}
@@ -299,7 +300,7 @@ impl SqliteEcashTicketbookManager {
serialisation_revision,
expiration_date
)
.execute(&self.connection_pool)
.execute(&*self.connection_pool)
.await?;
Ok(())
}
@@ -37,6 +37,7 @@ use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use sqlx_pool_guard::SqlitePoolGuard;
use std::path::Path;
use zeroize::Zeroizing;
@@ -62,7 +63,7 @@ impl PersistentStorage {
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.filename(&database_path)
.create_if_missing(true)
.disable_statement_logging();
@@ -74,13 +75,17 @@ impl PersistentStorage {
}
};
if let Err(err) = sqlx::migrate!("./migrations").run(&connection_pool).await {
let connection_pool =
SqlitePoolGuard::new(database_path.as_ref().to_path_buf(), connection_pool);
if let Err(err) = sqlx::migrate!("./migrations").run(&*connection_pool).await {
error!("Failed to perform migration on the SQLx database: {err}");
connection_pool.close().await;
return Err(err.into());
}
Ok(PersistentStorage {
storage_manager: SqliteEcashTicketbookManager::new(connection_pool.clone()),
storage_manager: SqliteEcashTicketbookManager::new(connection_pool),
})
}
}
@@ -2,6 +2,8 @@
FROM harbor.nymte.ch/dockerhub/rust:latest AS builder
ARG GIT_REF=main
ARG MNEMONIC
ENV MNEMONIC=${MNEMONIC}
RUN apt update && apt install -yy libdbus-1-dev pkg-config libclang-dev
@@ -21,7 +21,7 @@ export NODE_STATUS_AGENT_SERVER_ADDRESS="http://127.0.0.1"
export NODE_STATUS_AGENT_SERVER_PORT="8000"
export NODE_STATUS_AGENT_PROBE_PATH="$crate_root/nym-gateway-probe"
export NODE_STATUS_AGENT_AUTH_KEY="BjyC9SsHAZUzPRkQR4sPTvVrp4GgaquTh5YfSJksvvWT"
export NODE_STATUS_AGENT_PROBE_EXTRA_ARGS="netstack-download-timeout-sec=30,netstack-num-ping=2,netstack-send-timeout-sec=1,netstack-recv-timeout-sec=1"
export NODE_STATUS_AGENT_PROBE_EXTRA_ARGS="netstack-download-timeout-sec=30,netstack-num-ping=2,netstack-send-timeout-sec=1,netstack-recv-timeout-sec=1,mnemonic=\"${MNEMONIC}\""
workers=${1:-1}
echo "Running $workers workers in parallel"
+36
View File
@@ -0,0 +1,36 @@
[package]
name = "sqlx-pool-guard"
version = "0.1.0"
edition = "2024"
license.workspace = true
[lints]
workspace = true
[dependencies]
tracing.workspace = true
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.sqlx]
workspace = true
features = ["runtime-tokio-rustls", "sqlite"]
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio]
workspace = true
features = ["rt-multi-thread", "macros", "time", "fs"]
[target.'cfg(any(target_os = "macos", target_os = "ios"))'.dependencies]
proc_pidinfo.workspace = true
[target.'cfg(windows)'.dependencies]
windows = { version = "0.61", features = [
"Win32",
"Win32_System",
"Win32_System_Memory",
"Win32_System_Threading",
"Win32_Storage_FileSystem",
"Wdk_System_SystemInformation",
] }
[dev-dependencies]
tempfile.workspace = true
tracing-subscriber.workspace = true
+43
View File
@@ -0,0 +1,43 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::{io, path::Path};
use proc_pidinfo::{
ProcFDInfo, ProcFDType, VnodeFdInfoWithPath, proc_pidfdinfo_self, proc_pidinfo_list_self,
};
/// Check if there are no open file descriptors for the given files.
///
/// Uses `proc_pidinfo` (`sys/proc_info.h`)
/// See: http://blog.palominolabs.com/2012/06/19/getting-the-files-being-used-by-a-process-on-mac-os-x/
pub async fn check_files_closed(file_paths: &[&Path]) -> io::Result<bool> {
let fd_list = proc_pidinfo_list_self::<ProcFDInfo>()?;
for fd in fd_list
.iter()
.filter(|s| s.fd_type() == Ok(ProcFDType::VNODE))
{
let Some(vnode) = proc_pidfdinfo_self::<VnodeFdInfoWithPath>(fd.proc_fd)
.inspect_err(|e| {
tracing::warn!("proc_pidfdinfo_self::<VnodeFdInfoWithPath>() failure: {e}");
})
.ok()
.flatten()
else {
continue;
};
if let Ok(true) = vnode
.path()
.map(|vnode_path| file_paths.contains(&vnode_path))
.inspect_err(|e| {
tracing::warn!("vnode.path() failure: {e:?}");
})
{
return Ok(false);
}
}
Ok(true)
}
+175
View File
@@ -0,0 +1,175 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::{
io,
ops::{Deref, DerefMut},
path::{Path, PathBuf},
time::Duration,
};
#[cfg(windows)]
#[path = "windows.rs"]
mod imp;
#[cfg(any(target_os = "macos", target_os = "ios"))]
#[path = "apple.rs"]
mod imp;
#[cfg(any(target_os = "linux", target_os = "android"))]
#[path = "linux.rs"]
mod imp;
/// Max number of retry attempts
const CHECK_FILES_CLOSED_MAX_ATTEMPTS: u8 = 20;
/// Delay between file checks
const CHECK_FILES_CLOSED_RETRY_DELAY: Duration = Duration::from_millis(100);
/// `sqlx::SqlitePool` wrapper providing a workaround for the [known bug](https://github.com/launchbadge/sqlx/issues/3217).
/// In principle after requesting to close the sqlite pool, the wrapper monitors open file descriptor and polls periodically until all database files are closed.
#[derive(Debug, Clone)]
pub struct SqlitePoolGuard {
/// Path to sqlite database file.
database_path: PathBuf,
/// Inner connection pool.
connection_pool: sqlx::SqlitePool,
}
impl SqlitePoolGuard {
/// Create new instance providing path to database and connection pool
pub fn new(database_path: PathBuf, connection_pool: sqlx::SqlitePool) -> Self {
Self {
database_path,
connection_pool,
}
}
/// Returns database path
pub fn database_path(&self) -> &Path {
&self.database_path
}
/// Close udnerlying sqlite pool and wait for files to be closed before returning.
pub async fn close(&self) {
// Avoid waiting for db files once the pool is marked closed to ensure that we don't wait on some other sqlite pool to close the database.
if !self.connection_pool.is_closed() {
tracing::info!("Closing sqlite pool: {}", self.database_path.display());
self.close_pool_inner().await.ok();
}
}
async fn close_pool_inner(&self) -> std::io::Result<()> {
self.connection_pool.close().await;
self.wait_for_db_files_close().await.inspect_err(|e| {
tracing::error!("Failed to wait for file to close: {e}");
})
}
/// Returns all database files, including shm and wal files.
fn all_database_files(&self) -> Vec<PathBuf> {
let mut database_files = vec![];
let canonical_path = self
.database_path
.canonicalize()
.inspect_err(|e| {
tracing::error!(
"Failed to canonicalize path: {}. Cause: {e}",
self.database_path.display()
);
})
.unwrap_or(self.database_path.clone());
if let Some(ext) = canonical_path.extension() {
for added_ext in ["-shm", "-wal"] {
let mut new_ext = ext.to_owned();
new_ext.push(added_ext);
database_files.push(canonical_path.with_extension(new_ext));
}
}
database_files.push(canonical_path);
database_files
}
/// Wait for database files to be closed before returning.
async fn wait_for_db_files_close(&self) -> std::io::Result<()> {
let database_files = self.all_database_files();
let paths: Vec<&Path> = database_files.iter().map(PathBuf::as_path).collect();
for _ in 0..CHECK_FILES_CLOSED_MAX_ATTEMPTS {
match imp::check_files_closed(&paths)
.await
.inspect_err(|e| tracing::error!("imp::check_files_closed() failure: {e}"))
{
Ok(false) | Err(_) => tokio::time::sleep(CHECK_FILES_CLOSED_RETRY_DELAY).await,
Ok(true) => return Ok(()),
}
}
Err(io::Error::new(
io::ErrorKind::TimedOut,
"timed out waiting for sqlite files to be closed",
))
}
}
impl Deref for SqlitePoolGuard {
type Target = sqlx::SqlitePool;
fn deref(&self) -> &Self::Target {
&self.connection_pool
}
}
impl DerefMut for SqlitePoolGuard {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.connection_pool
}
}
#[cfg(test)]
mod tests {
use sqlx::{
ConnectOptions, Executor,
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
};
use super::*;
#[tokio::test]
async fn test_wait_close() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.init();
let temp_dir = tempfile::tempdir().unwrap();
let database_path = temp_dir.path().join("storage.db");
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path.clone())
.create_if_missing(true)
.disable_statement_logging();
let connection_pool = sqlx::SqlitePool::connect_with(opts).await.unwrap();
connection_pool
.execute("create table test (col int)")
.await
.unwrap();
let guard = SqlitePoolGuard::new(database_path.clone(), connection_pool);
assert!(
guard
.wait_for_db_files_close()
.await
.err()
.is_some_and(|e| e.kind() == io::ErrorKind::TimedOut)
);
assert!(guard.close_pool_inner().await.is_ok());
tokio::fs::remove_file(database_path).await.unwrap();
}
}
+36
View File
@@ -0,0 +1,36 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::{io, path::Path};
static PROC_SELF_FD_DIR: &str = "/proc/self/fd/";
/// Check if there are no open file descriptors for the given files.
///
/// Linux, Android: uses `/proc/self/fd/` to list open file descriptors
/// See: https://stackoverflow.com/a/59797198/351305
pub async fn check_files_closed(file_paths: &[&Path]) -> io::Result<bool> {
let mut dir = tokio::fs::read_dir(PROC_SELF_FD_DIR).await?;
while let Ok(Some(entry)) = dir.next_entry().await {
if entry
.file_type()
.await
.inspect_err(|e| tracing::warn!("entry.file_type() failure: {e}"))
.is_ok_and(|entry_type| entry_type.is_symlink())
{
match tokio::fs::read_link(entry.path()).await {
Ok(resolved_path) => {
if file_paths.contains(&resolved_path.as_ref()) {
return Ok(false);
}
}
Err(e) => {
tracing::error!("Failed to read symlink: {e}");
}
}
}
}
Ok(true)
}
+188
View File
@@ -0,0 +1,188 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::{
ffi::{OsString, c_uchar, c_ulong, c_ushort, c_void},
io,
os::windows::ffi::OsStringExt,
path::{Path, PathBuf},
};
use windows::{
Wdk::System::SystemInformation::{NtQuerySystemInformation, SYSTEM_INFORMATION_CLASS},
Win32::{
Foundation::{HANDLE, MAX_PATH, NTSTATUS, STATUS_INFO_LENGTH_MISMATCH},
Storage::FileSystem::{
FILE_NAME_NORMALIZED, FILE_TYPE_DISK, GetFileType, GetFinalPathNameByHandleW,
},
System::{
Memory::{
GetProcessHeap, HEAP_FLAGS, HEAP_ZERO_MEMORY, HeapAlloc, HeapFree, HeapReAlloc,
},
Threading::GetCurrentProcessId,
},
},
};
/// Private information class used to retrieve open file handles
const SYSTEM_HANDLE_INFORMATION_CLASS: SYSTEM_INFORMATION_CLASS = SYSTEM_INFORMATION_CLASS(0x10);
/// Initial buffer size holding the handle info
/// The number is based on what I observe on a pretty standard Windows 11
const SYSTEM_HANDLE_INFORMATION_INITIAL_SIZE: usize = 2_500_000;
/// Check if there are no open handles to the given files.
///
/// Uses undocumented NT API to obtain open handles on the system.
/// See: https://www.ired.team/miscellaneous-reversing-forensics/windows-kernel-internals/get-all-open-handles-and-kernel-object-address-from-userland
pub async fn check_files_closed(file_paths: &[&Path]) -> io::Result<bool> {
let current_pid = unsafe { GetCurrentProcessId() };
// Allocate info struct on heap with some initial value
let mut reserved_memory = SYSTEM_HANDLE_INFORMATION_INITIAL_SIZE;
let mut handle_table_info = HeapGuard::<SystemHandleInformation>::new(reserved_memory)?;
// Request system handle information
let mut status: NTSTATUS = NTSTATUS::default();
let mut return_len = reserved_memory as u32;
for _ in 0..2 {
status = unsafe {
NtQuerySystemInformation(
SYSTEM_HANDLE_INFORMATION_CLASS,
handle_table_info.as_mut_ptr() as _,
return_len,
&mut return_len,
)
};
// Buffer is too small, resize memory and retry again.
if status == STATUS_INFO_LENGTH_MISMATCH {
tracing::trace!("Buffer is too small ({reserved_memory}), resizing to {return_len}");
reserved_memory = return_len as usize;
handle_table_info.reallocate(reserved_memory)?;
} else {
break;
}
}
status.ok()?;
// Convert returned data into slice
let num_handles = unsafe { (*handle_table_info.inner).number_of_handles };
let proc_entries = unsafe {
std::slice::from_raw_parts(
(*handle_table_info.as_mut_ptr()).handles.as_ptr(),
num_handles as usize,
)
};
// Iterate over open file handle entries
for entry in proc_entries {
if entry.unique_process_id == current_pid {
let file_handle = HANDLE(entry.handle_value as _);
// Filter everything except disk files
if unsafe { GetFileType(file_handle) } == FILE_TYPE_DISK {
// Obtain canonical path for file handle
let mut file_handle_path = vec![0u16; MAX_PATH as usize];
let num_chars_without_nul = unsafe {
GetFinalPathNameByHandleW(
file_handle,
&mut file_handle_path,
FILE_NAME_NORMALIZED,
) as usize
};
if num_chars_without_nul > 0 {
let path_str = OsString::from_wide(&file_handle_path[0..num_chars_without_nul]);
let file_handle_pathbuf = PathBuf::from(path_str);
if file_paths.contains(&file_handle_pathbuf.as_path()) {
return Ok(false);
}
}
}
}
}
Ok(true)
}
#[repr(C)]
#[derive(Copy, Clone)]
struct SystemHandleInformation {
pub number_of_handles: c_ulong,
pub handles: [SystemHandleTableEntryInfo; 1],
}
#[repr(C)]
#[derive(Copy, Clone)]
struct SystemHandleTableEntryInfo {
pub unique_process_id: c_ulong,
pub object_type_index: c_uchar,
pub handle_attributes: c_uchar,
pub handle_value: c_ushort,
pub object: *mut c_void,
pub granted_access: c_ulong,
}
/// Managed heap memory
struct HeapGuard<T> {
inner: *mut T,
process_heap: HANDLE,
}
impl<T> HeapGuard<T> {
/// Allocate new memory using `HealAlloc`
fn new(length: usize) -> io::Result<Self> {
let process_heap = unsafe { GetProcessHeap()? };
let inner: *mut T = unsafe { HeapAlloc(process_heap, HEAP_ZERO_MEMORY, length) as _ };
if inner.is_null() {
Err(io::Error::other("Failed to allocate memory"))
} else {
Ok(Self {
inner,
process_heap,
})
}
}
/// Reallocate existing chunk of memory
///
/// On success: the internal memory pointer is replaced.
/// On failure: the internal memory pointer remains the same and still valid.
fn reallocate(&mut self, new_length: usize) -> io::Result<()> {
let new_ptr: *mut T = unsafe {
HeapReAlloc(
self.process_heap,
HEAP_ZERO_MEMORY,
Some(self.inner as _),
new_length,
) as _
};
if new_ptr.is_null() {
Err(io::Error::other("Failed to reallocate memory"))
} else {
self.inner = new_ptr;
Ok(())
}
}
fn as_mut_ptr(&self) -> *mut T {
self.inner
}
}
impl<T> Drop for HeapGuard<T> {
fn drop(&mut self) {
#[allow(clippy::expect_used)]
unsafe {
HeapFree(
self.process_heap,
HEAP_FLAGS(0),
Some(self.inner as *mut c_void),
)
}
.expect("HeapFree failure");
}
}