Compare commits

...

39 Commits

Author SHA1 Message Date
benedetta davico 4c449797eb Update Cargo.toml 2025-08-27 08:03:16 +02:00
benedetta davico fcdf0fb580 Update push-node-status-api.yaml 2025-08-27 08:02:55 +02:00
benedettadavico 66d0296f47 update dockerfile to pg 2025-08-26 16:02:11 +02:00
benedettadavico 03bbbf44e9 ns api ci fix 2025-08-26 16:02:11 +02:00
dynco-nym 0a48fa6172 Remove freshness check on testrun submit (#5977)
* Remove freshness check on testrun submit
- freshness is enforced by a background task
  that marks testruns as stale after a
  configured amount of time

* Move code around

* Add humantime

* Update launch script

* Fix typo

* Adjust agent run script

* Configure user agent

* Bump version
2025-08-26 12:26:13 +02:00
p17o 18d9d807f2 Added VPS provider hostraha.com (#5959) 2025-08-22 12:31:31 +00:00
import this 3a7393d316 [DOCs/operators]: Roll back from v2025.15-gruyere to v2025.14-feta (#5973)
* release notes and version bump up

* update stats

* roll version back and comment new notes
2025-08-22 11:23:16 +00:00
import this 6ce5f707c6 [DOCs/operators]: Release notes for v2025.15 gruyere (#5969)
* release notes and version bump up

* update stats
2025-08-21 11:49:52 +00:00
benedetta davico 766a1d4497 Merge pull request #5965 from nymtech/benny/fix-ns-agent-ci
fixing the ci for ns agent
2025-08-21 12:22:05 +02:00
benedetta davico 35c83f0a31 Merge pull request #5967 from nymtech/release/2025.15-gruyere
merge gruyere to develop
2025-08-21 12:20:43 +02:00
benedetta davico 01dd4a7972 Merge pull request #5958 from nymtech/bugfix/linux-build-ci
bugfix: fix ci-build for linux (and use updated runner)
2025-08-21 10:32:51 +02:00
Jędrzej Stuczyński c2e335557e Feature/testing utils (#5963)
* helper wrapper for stream-sink channel

* similar helper for async read/write

* example tests and clippy
2025-08-20 16:17:09 +01:00
benedettadavico 40e1cbc7a9 update changelog 2025-08-20 12:34:04 +02:00
Jędrzej Stuczyński c133e0e88b chore: updated refs to cheddar rev of nym repo (#5955)
* chore: updated refs to cheddar rev of nym repo

* update statistics-api version
2025-08-19 09:28:42 +01:00
Andrej Mihajlov 5b716633de Merge pull request #5960 from nymtech/am/update-strum
Migrate strum to 0.27.2
2025-08-18 18:28:22 +02:00
Andrej Mihajlov 834538300d Migrate to strum 0.27.2 2025-08-18 18:02:41 +02:00
Jędrzej Stuczyński bd0d70f7cd bugfix: fix ci-build for linux (and use updated runner) 2025-08-15 09:37:41 +01:00
Jędrzej Stuczyński 979485c582 http api client adjustment (#5953)
* missing feature lock for attempting to clone client

* added helper macro to generate user agent without additional imports
2025-08-13 12:52:16 +01:00
Bogdan-Ștefan Neacşu d95f66bd90 Move credential verifier in peer controller (#5938)
* Move credential verifier in peer controller

* Send back errors of peer controller
2025-08-13 13:09:44 +03:00
Jędrzej Stuczyński 906dfb2fb0 change PK/FK on expiration date signatures tables (#5934)
* update nym-credential-proxy

* update credential-storage

* update nym-api

* clippy
2025-08-12 09:03:53 +01:00
Jędrzej Stuczyński 7daa726626 feat: introduce additional checks when attempting to send to bounded channels (#5941)
* feat: introduce additional checks when attempting to send to bounded channels

or to a fallible gateway

* return error rather than panic when merging socket during shutdown
2025-08-11 09:15:12 +01:00
Jędrzej Stuczyński 067f492ad6 chore: fix rust 1.89 clippy issues (#5944) 2025-08-08 13:03:05 +01:00
Jędrzej Stuczyński ed73ec9ce6 chore: remove unused import (#5942) 2025-08-07 09:56:08 +01:00
import this 61606630bd [DOCs/operators]: Release notes v2025.14-feta (#5935)
* update release

* fix typos
2025-08-06 08:24:38 +00:00
benedettadavico 2d3deeb424 bump versions 2025-08-06 09:56:08 +02:00
benedetta davico 3827dc357d Merge pull request #5936 from nymtech/release/2025.14-feta
Merge release/2025.14-feta to develop
2025-08-06 09:54:29 +02:00
benedetta davico a70e9e23d3 Merge branch 'develop' into release/2025.14-feta 2025-08-06 09:48:22 +02:00
Jędrzej Stuczyński dc59149a5d squashed feature/ecash-liveness-check (#5890) (#5890)
delay to gruyere

chore: delay to Feta

added threshold information to the response

nym api test clippy

bugfixes and endpoint improvements

expose results on api endpoints

wip: making nym api monitor network signers

added fallback legacy queries to get basic support idea

refactored the code to expose bool-only methods for status

ecash-signer-check lib for obtaining basic ecash signer information
2025-08-05 12:28:42 +01:00
benedetta davico e418c7587a Merge pull request #5914 from nymtech/feature/nym-node-gw-reset
nym-node debug command to reset providers db
2025-08-05 12:05:44 +02:00
import this 33339c085d [DOCs/operators]: Update ISP list (#5918)
* update ISP list

* remove typo
2025-07-31 13:47:27 +00:00
Sachin Kamath 863f329106 docs: update validator instructions and waitlist callout (#5922) 2025-07-30 15:03:39 +00:00
import this 314a37cabe WG exit policy scripts update (#5921)
* add NIP-3 ports to WG manager script

* add monero ports to local testing script

* console output snippet update
2025-07-30 09:43:39 +00:00
Jack Wampler 917f391948 Make DNS Resolver fallback optional (#5920)
default to no dns system fallback, but keep support
2025-07-29 11:00:24 -06:00
Jędrzej Stuczyński 0b4deda621 nym-node debug command to reset providers db 2025-07-25 13:33:12 +01:00
benedetta davico d01867ca8d save version 2025-07-25 11:27:42 +02:00
benedetta davico 502c63b291 Fix broken CI 2025-07-25 11:19:27 +02:00
Jędrzej Stuczyński a4e674c98b basic zulip client for sending messages (#5913) 2025-07-24 16:22:35 +01:00
Jędrzej Stuczyński 7f97f13799 chore: nym node tokio console (#5909)
* conditionally enable console-subscriber within nym-node

* Update ci-build-upload-binaries.yml

* Update ci-build-upload-binaries.yml

add features console

* updated feature name

* fixed filtering on tracing layers

* add track_caller when spawning futures for better tokio-console support

* allow [client] tasks to specify their names when used within tokio console

* clippy

* pre-emptively fix wasm clippy

---------

Co-authored-by: Tommy Verrall <60836166+tommyv1987@users.noreply.github.com>
2025-07-24 11:00:58 +01:00
benedettadavico 85604e8305 bump versions 2025-07-23 10:18:45 +02:00
232 changed files with 8561 additions and 5482 deletions
@@ -38,15 +38,14 @@ jobs:
rm -rf ci-builds || true
mkdir -p $OUTPUT_DIR
echo $OUTPUT_DIR
- name: Install Dependencies (Linux)
run: sudo apt-get update && sudo apt-get -y install libudev-dev
- name: Sets env vars for tokio if set in manual dispatch inputs
run: |
echo 'RUSTFLAGS="--cfg tokio_unstable"' >> $GITHUB_ENV
if: github.event_name == 'workflow_dispatch' && inputs.add_tokio_unstable == true
run: |
echo "RUSTFLAGS=--cfg tokio_unstable" >> $GITHUB_ENV
echo "CARGO_FEATURES=--features tokio-console" >> $GITHUB_ENV
- name: Install Rust stable
uses: actions-rs/toolchain@v1
with:
@@ -103,7 +102,6 @@ jobs:
if [ ${{ github.event_name == 'workflow_dispatch' && inputs.enable_deb == true }} = true ]; then
cp target/debian/*.deb $OUTPUT_DIR
fi
- name: Deploy branch to CI www
continue-on-error: true
uses: easingthemes/ssh-deploy@main
+7 -7
View File
@@ -38,7 +38,7 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [ arc-ubuntu-22.04, custom-windows-11, custom-macos-15 ]
os: [ arc-linux-latest, custom-windows-11, custom-macos-15 ]
runs-on: ${{ matrix.os }}
env:
CARGO_TERM_COLOR: always
@@ -46,9 +46,9 @@ jobs:
RUSTUP_PERMIT_COPY_RENAME: 1
steps:
- name: Install Dependencies (Linux)
run: sudo apt-get update && sudo apt-get -y install libwebkit2gtk-4.0-dev build-essential curl wget libssl-dev libgtk-3-dev libudev-dev squashfs-tools protobuf-compiler
run: sudo apt-get update && sudo apt-get -y install libwebkit2gtk-4.0-dev build-essential curl wget libssl-dev libgtk-3-dev libudev-dev squashfs-tools protobuf-compiler cmake
continue-on-error: true
if: contains(matrix.os, 'ubuntu')
if: contains(matrix.os, 'linux')
- name: Check out repository code
uses: actions/checkout@v4
@@ -63,7 +63,7 @@ jobs:
# To avoid running out of disk space, skip generating debug symbols
- name: Set debug to false (unix)
if: contains(matrix.os, 'ubuntu') || contains(matrix.os, 'mac')
if: contains(matrix.os, 'linux') || contains(matrix.os, 'mac')
run: |
sed -i.bak 's/\[profile.dev\]/\[profile.dev\]\ndebug = false/' Cargo.toml
git diff
@@ -93,14 +93,14 @@ jobs:
command: build
- name: Build all examples
if: contains(matrix.os, 'ubuntu')
if: contains(matrix.os, 'linux')
uses: actions-rs/cargo@v1
with:
command: build
args: --workspace --examples
- name: Run all tests
if: contains(matrix.os, 'ubuntu')
if: contains(matrix.os, 'linux')
uses: actions-rs/cargo@v1
env:
NYM_API: https://sandbox-nym-api1.nymtech.net/api
@@ -109,7 +109,7 @@ jobs:
args: --workspace
- name: Run expensive tests
if: (github.ref == 'refs/heads/develop' || github.event.pull_request.base.ref == 'develop' || github.event.pull_request.base.ref == 'master') && contains(matrix.os, 'ubuntu')
if: (github.ref == 'refs/heads/develop' || github.event.pull_request.base.ref == 'develop' || github.event.pull_request.base.ref == 'master') && contains(matrix.os, 'linux')
uses: actions-rs/cargo@v1
with:
command: test
@@ -40,7 +40,8 @@ jobs:
- name: Get version from cargo.toml
id: get_version
run: |
yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
VERSION=$(yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml)
echo "result=$VERSION" >> $GITHUB_OUTPUT
- name: cleanup-gateway-probe-ref
id: cleanup_gateway_probe_ref
@@ -52,13 +53,16 @@ jobs:
- name: Set GIT_TAG variable
run: echo "GIT_TAG=${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}" >> $GITHUB_ENV
- name: Set RELEASE_TAG variable
- name: Initialize RELEASE_TAG
run: echo "RELEASE_TAG=" >> $GITHUB_ENV
- name: Set RELEASE_TAG for release
if: github.event.inputs.release_image == 'true'
run: echo "RELEASE_TAG=golden-" >> $GITHUB_ENV
- name: Set IMAGE_NAME_AND_TAGS variable
run: echo "IMAGE_NAME_AND_TAGS=${{ env.CONTAINER_NAME }}:${{ env.RELEASE_TAG }}${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}" >> $GITHUB_ENV
- name: New env vars
run: echo "RELEASE_TAG='$RELEASE_TAG' GIT_TAG='$GIT_TAG' IMAGE_NAME_AND_TAGS='$IMAGE_NAME_AND_TAGS'"
+9 -5
View File
@@ -34,18 +34,22 @@ jobs:
- name: Get version from cargo.toml
id: get_version
run: |
yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
VERSION=$(yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml)
echo "result=$VERSION" >> $GITHUB_OUTPUT
- name: Set GIT_TAG variable
run: echo "GIT_TAG=${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}" >> $GITHUB_ENV
- name: Set RELEASE_TAG variable
- name: Initialise RELEASE_TAG
run: echo "RELEASE_TAG=" >> $GITHUB_ENV
- name: Set RELEASE_TAG for release
if: github.event.inputs.release_image == 'true'
run: echo "RELEASE_TAG=golden-" >> $GITHUB_ENV
- name: Set IMAGE_NAME_AND_TAGS variable
run: echo "IMAGE_NAME_AND_TAGS=${{ env.CONTAINER_NAME }}:${{ env.RELEASE_TAG }}${{ steps.get_version.outputs.result }}" >> $GITHUB_ENV
- name: New env vars
run: echo "RELEASE_TAG='$RELEASE_TAG' GIT_TAG='$GIT_TAG' IMAGE_NAME_AND_TAGS='$IMAGE_NAME_AND_TAGS'"
@@ -65,6 +69,6 @@ jobs:
- name: BuildAndPushImageOnHarbor
run: |
docker build -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.IMAGE_NAME_AND_TAGS }} -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:latest
docker build -f ${{ env.WORKING_DIRECTORY }}/Dockerfile-sqlite . -t harbor.nymte.ch/nym/${{ env.IMAGE_NAME_AND_TAGS }} -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:latest
docker push harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }} --all-tags
+36
View File
@@ -4,6 +4,42 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
## [Unreleased]
## [2025.15-gruyere] (2025-08-20)
- Migrate strum to 0.27.2 ([#5960])
- WG exit policy scripts update ([#5921])
- Make DNS Resolver fallback optional ([#5920])
- nym-node debug command to reset providers db ([#5914])
- basic zulip client for sending messages ([#5913])
- chore: allow compatibility with 'CDLA-Permissive-2.0' ([#5910])
- feat: ecash liveness check ([#5890])
- Remove old free credential handle ([#5864])
[#5960]: https://github.com/nymtech/nym/pull/5960
[#5921]: https://github.com/nymtech/nym/pull/5921
[#5920]: https://github.com/nymtech/nym/pull/5920
[#5914]: https://github.com/nymtech/nym/pull/5914
[#5913]: https://github.com/nymtech/nym/pull/5913
[#5910]: https://github.com/nymtech/nym/pull/5910
[#5890]: https://github.com/nymtech/nym/pull/5890
[#5864]: https://github.com/nymtech/nym/pull/5864
## [2025.14-feta] (2025-08-05)
- chore: nym node tokio console ([#5909])
- Feature/dkg snapshot epoch ([#5900])
- Feature/dkg epoch dealers query ([#5899])
- sqlx-pool-guard: allocate more memory on windows ([#5896])
- Support mnemonic in the NS agent ([#5883])
- Allow PG database backend ([#5880])
[#5909]: https://github.com/nymtech/nym/pull/5909
[#5900]: https://github.com/nymtech/nym/pull/5900
[#5899]: https://github.com/nymtech/nym/pull/5899
[#5896]: https://github.com/nymtech/nym/pull/5896
[#5883]: https://github.com/nymtech/nym/pull/5883
[#5880]: https://github.com/nymtech/nym/pull/5880
## [2025.13-emmental] (2025-07-22)
- fix: don't allow mixnode running in exit mode ([#5898])
Generated
+615 -1395
View File
File diff suppressed because it is too large Load Diff
+12 -5
View File
@@ -39,7 +39,8 @@ members = [
"common/cosmwasm-smart-contracts/ecash-contract",
"common/cosmwasm-smart-contracts/group-contract",
"common/cosmwasm-smart-contracts/mixnet-contract",
"common/cosmwasm-smart-contracts/multisig-contract", "common/cosmwasm-smart-contracts/nym-performance-contract",
"common/cosmwasm-smart-contracts/multisig-contract",
"common/cosmwasm-smart-contracts/nym-performance-contract",
"common/cosmwasm-smart-contracts/nym-pool-contract",
"common/cosmwasm-smart-contracts/vesting-contract",
"common/credential-storage",
@@ -49,6 +50,8 @@ members = [
"common/credentials-interface",
"common/crypto",
"common/dkg",
"common/ecash-signer-check",
"common/ecash-signer-check-types",
"common/ecash-time",
"common/execute",
"common/exit-policy",
@@ -89,7 +92,7 @@ members = [
"common/socks5/requests",
"common/statistics",
"common/store-cipher",
"common/task",
"common/task", "common/test-utils",
"common/ticketbooks-merkle",
"common/topology",
"common/tun",
@@ -100,6 +103,7 @@ members = [
"common/wasm/utils",
"common/wireguard",
"common/wireguard-types",
"common/zulip-client",
"documentation/autodoc",
"gateway",
"nym-api",
@@ -218,7 +222,7 @@ clap_complete_fig = "4.5"
colored = "2.2"
comfy-table = "7.1.4"
console = "0.15.11"
console-subscriber = "0.1.1"
console-subscriber = "0.4.1"
console_error_panic_hook = "0.1"
const-str = "0.5.6"
const_format = "0.2.34"
@@ -317,8 +321,8 @@ si-scale = "0.2.3"
snow = "0.9.6"
sphinx-packet = "=0.6.0"
sqlx = "0.8.6"
strum = "0.26"
strum_macros = "0.26"
strum = "0.27.2"
strum_macros = "0.27.2"
subtle-encoding = "0.5"
syn = "1"
sysinfo = "0.33.0"
@@ -435,6 +439,9 @@ opt-level = 'z'
# lto = true
opt-level = 'z'
[workspace.lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
[workspace.lints.clippy]
unwrap_used = "deny"
expect_used = "deny"
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-client"
version = "1.1.59"
version = "1.1.61"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>", "Jędrzej Stuczyński <andrew@nymtech.net>"]
description = "Implementation of the Nym Client"
edition = "2021"
+1 -1
View File
@@ -111,7 +111,7 @@ impl SocketClient {
let dkg_query_client = if self.config.base.client.disabled_credentials_mode {
None
} else {
Some(default_query_dkg_client_from_config(&self.config.base))
Some(default_query_dkg_client_from_config(&self.config.base)?)
};
let storage = self.initialise_storage().await?;
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-socks5-client"
version = "1.1.59"
version = "1.1.61"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
description = "A SOCKS5 localhost proxy that converts incoming messages to Sphinx and sends them to a Nym address"
edition = "2021"
+2 -2
View File
@@ -207,7 +207,7 @@ where
<St as Storage>::StorageError: Send + Sync + 'static,
{
if let Some(stored) = storage
.get_expiration_date_signatures(expiration_date)
.get_expiration_date_signatures(expiration_date, epoch_id)
.await
.map_err(BandwidthControllerError::credential_storage_error)?
{
@@ -220,7 +220,7 @@ where
ecash_apis,
|api| async move {
api.api_client
.global_expiration_date_signatures(Some(expiration_date))
.global_expiration_date_signatures(Some(expiration_date), Some(epoch_id))
.await
},
format!("aggregated coin index signatures for date {expiration_date}"),
+4
View File
@@ -13,6 +13,7 @@ async-trait = { workspace = true }
base64 = { workspace = true }
bs58 = { workspace = true }
clap = { workspace = true, optional = true }
cfg-if = { workspace = true }
comfy-table = { workspace = true, optional = true }
futures = { workspace = true }
humantime = { workspace = true }
@@ -123,3 +124,6 @@ fs-surb-storage = ["nym-client-core-surb-storage/fs-surb-storage"]
fs-gateways-storage = ["nym-client-core-gateways-storage/fs-gateways-storage"]
wasm = ["nym-gateway-client/wasm"]
metrics-server = []
[lints]
workspace = true
@@ -58,6 +58,7 @@ where
Some(data) => data,
None => {
// SAFETY: one of those arguments must have been set
#[allow(clippy::unwrap_used)]
fs::read(common_args.signatures_path.unwrap())?
}
};
@@ -64,6 +64,7 @@ where
Some(data) => data,
None => {
// SAFETY: one of those arguments must have been set
#[allow(clippy::unwrap_used)]
fs::read(common_args.credential_path.unwrap())?
}
};
@@ -58,6 +58,7 @@ where
Some(data) => data,
None => {
// SAFETY: one of those arguments must have been set
#[allow(clippy::unwrap_used)]
fs::read(common_args.signatures_path.unwrap())?
}
};
@@ -58,6 +58,7 @@ where
Some(data) => data,
None => {
// SAFETY: one of those arguments must have been set
#[allow(clippy::unwrap_used)]
fs::read(common_args.key_path.unwrap())?
}
};
@@ -135,9 +135,11 @@ pub enum ClientInputStatus {
}
impl ClientInputStatus {
#[allow(clippy::panic)]
pub fn register_producer(&mut self) -> ClientInput {
match std::mem::replace(self, ClientInputStatus::Connected) {
ClientInputStatus::AwaitingProducer { client_input } => client_input,
// critical failure implying misuse of software
ClientInputStatus::Connected => panic!("producer was already registered before"),
}
}
@@ -149,9 +151,11 @@ pub enum ClientOutputStatus {
}
impl ClientOutputStatus {
#[allow(clippy::panic)]
pub fn register_consumer(&mut self) -> ClientOutput {
match std::mem::replace(self, ClientOutputStatus::Connected) {
ClientOutputStatus::AwaitingConsumer { client_output } => client_output,
// critical failure implying misuse of software
ClientOutputStatus::Connected => panic!("consumer was already registered before"),
}
}
@@ -707,11 +711,14 @@ where
})?;
let store_clone = mem_store.clone();
spawn_future(async move {
persistent_storage
.flush_on_shutdown(store_clone, shutdown)
.await
});
spawn_future!(
async move {
persistent_storage
.flush_on_shutdown(store_clone, shutdown)
.await
},
"PersistentReplyStorage::flush_on_shutdown"
);
Ok(mem_store)
}
@@ -732,7 +739,7 @@ where
let mut rng = OsRng;
let keys = if let Some(derivation_material) = derivation_material {
ClientKeys::from_master_key(&mut rng, &derivation_material)
.map_err(|_| ClientCoreError::HkdfDerivationError {})?
.map_err(|_| ClientCoreError::HkdfDerivationError)?
} else {
ClientKeys::generate_new(&mut rng)
};
@@ -114,41 +114,32 @@ pub async fn setup_fs_gateways_storage<P: AsRef<Path>>(
})
}
pub fn create_bandwidth_controller<St: CredentialStorage>(
config: &Config,
storage: St,
) -> BandwidthController<QueryHttpRpcNyxdClient, St> {
let nyxd_url = config
.get_validator_endpoints()
.pop()
.expect("No nyxd validator endpoint provided");
create_bandwidth_controller_with_urls(nyxd_url, storage)
}
pub fn create_bandwidth_controller_with_urls<St: CredentialStorage>(
nyxd_url: Url,
storage: St,
) -> BandwidthController<QueryHttpRpcNyxdClient, St> {
let client = default_query_dkg_client(nyxd_url);
) -> Result<BandwidthController<QueryHttpRpcNyxdClient, St>, ClientCoreError> {
let client = default_query_dkg_client(nyxd_url)?;
BandwidthController::new(storage, client)
Ok(BandwidthController::new(storage, client))
}
pub fn default_query_dkg_client_from_config(config: &Config) -> QueryHttpRpcNyxdClient {
pub fn default_query_dkg_client_from_config(
config: &Config,
) -> Result<QueryHttpRpcNyxdClient, ClientCoreError> {
let nyxd_url = config
.get_validator_endpoints()
.pop()
.expect("No nyxd validator endpoint provided");
.ok_or(ClientCoreError::RpcClientMissingUrl)?;
default_query_dkg_client(nyxd_url)
}
pub fn default_query_dkg_client(nyxd_url: Url) -> QueryHttpRpcNyxdClient {
pub fn default_query_dkg_client(nyxd_url: Url) -> Result<QueryHttpRpcNyxdClient, ClientCoreError> {
let details = nym_network_defaults::NymNetworkDetails::new_from_env();
let client_config = nyxd::Config::try_from_nym_network_details(&details)
.expect("failed to construct validator client config");
.map_err(|source| ClientCoreError::InvalidNetworkDetails { source })?;
// overwrite env configuration with config URLs
QueryHttpRpcNyxdClient::connect(client_config, nyxd_url.as_str())
.expect("Could not construct query client")
.map_err(|source| ClientCoreError::RpcClientCreationFailure { source })
}
@@ -235,6 +235,7 @@ impl LoopCoverTrafficStream<OsRng> {
tokio::task::yield_now().await;
}
#[allow(clippy::panic)]
pub fn start(mut self) {
if self.cover_traffic.disable_loop_cover_traffic_stream {
// we should have never got here in the first place - the task should have never been created to begin with
@@ -251,27 +252,30 @@ impl LoopCoverTrafficStream<OsRng> {
let mut shutdown = self.task_client.fork("select");
spawn_future(async move {
debug!("Started LoopCoverTrafficStream with graceful shutdown support");
spawn_future!(
async move {
debug!("Started LoopCoverTrafficStream with graceful shutdown support");
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown.recv() => {
tracing::trace!("LoopCoverTrafficStream: Received shutdown");
}
next = self.next() => {
if next.is_some() {
self.on_new_message().await;
} else {
tracing::trace!("LoopCoverTrafficStream: Stopping since channel closed");
break;
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown.recv() => {
tracing::trace!("LoopCoverTrafficStream: Received shutdown");
}
next = self.next() => {
if next.is_some() {
self.on_new_message().await;
} else {
tracing::trace!("LoopCoverTrafficStream: Stopping since channel closed");
break;
}
}
}
}
}
shutdown.recv_timeout().await;
tracing::debug!("LoopCoverTrafficStream: Exiting");
})
shutdown.recv_timeout().await;
tracing::debug!("LoopCoverTrafficStream: Exiting");
},
"LoopCoverTrafficStream"
)
}
}
@@ -96,72 +96,93 @@ impl MixTrafficController {
mut mix_packets: Vec<MixPacket>,
) -> Result<(), ErasedGatewayError> {
debug_assert!(!mix_packets.is_empty());
let result = if mix_packets.len() == 1 {
let send_future = if mix_packets.len() == 1 {
// SAFETY: we just checked we have one packet
#[allow(clippy::unwrap_used)]
let mix_packet = mix_packets.pop().unwrap();
self.gateway_transceiver.send_mix_packet(mix_packet).await
self.gateway_transceiver.send_mix_packet(mix_packet)
} else {
self.gateway_transceiver
.batch_send_mix_packets(mix_packets)
.await
self.gateway_transceiver.batch_send_mix_packets(mix_packets)
};
if result.is_err() {
self.consecutive_gateway_failure_count += 1;
} else {
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
self.consecutive_gateway_failure_count = 0;
}
tokio::select! {
biased;
_ = self.task_client.recv() => {
trace!("received shutdown while handling messages");
Ok(())
}
result = send_future => {
if result.is_err() {
self.consecutive_gateway_failure_count += 1;
} else {
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
self.consecutive_gateway_failure_count = 0;
}
result
result
}
}
}
async fn on_client_request(&mut self, client_request: ClientRequest) {
tokio::select! {
biased;
_ = self.task_client.recv() => {
trace!("received shutdown while handling client request");
}
result = self.gateway_transceiver.send_client_request(client_request) => {
if let Err(err) = result {
error!("Failed to send client request: {err}")
}
}
}
}
pub fn start(mut self) {
spawn_future(async move {
debug!("Started MixTrafficController with graceful shutdown support");
while !self.task_client.is_shutdown() {
tokio::select! {
mix_packets = self.mix_rx.recv() => match mix_packets {
Some(mix_packets) => {
if let Err(err) = self.on_messages(mix_packets).await {
error!("Failed to send sphinx packet(s) to the gateway: {err}");
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
// Disconnect from the gateway. If we should try to re-connect
// is handled at a higher layer.
error!("Failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead");
// Do we need to handle the embedded mixnet client case
// separately?
self.task_client.send_we_stopped(Box::new(ClientCoreError::GatewayFailedToForwardMessages));
break;
}
}
},
None => {
tracing::trace!("MixTrafficController: Stopping since channel closed");
spawn_future!(
async move {
debug!("Started MixTrafficController with graceful shutdown support");
while !self.task_client.is_shutdown() {
tokio::select! {
biased;
_ = self.task_client.recv() => {
tracing::trace!("MixTrafficController: Received shutdown");
break;
}
},
client_request = self.client_rx.recv() => match client_request {
Some(client_request) => {
match self.gateway_transceiver.send_client_request(client_request).await {
Ok(_) => (),
Err(e) => error!("Failed to send client request: {e}"),
};
mix_packets = self.mix_rx.recv() => match mix_packets {
Some(mix_packets) => {
if let Err(err) = self.on_messages(mix_packets).await {
error!("Failed to send sphinx packet(s) to the gateway: {err}");
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
// Disconnect from the gateway. If we should try to re-connect
// is handled at a higher layer.
error!("Failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead");
// Do we need to handle the embedded mixnet client case
// separately?
self.task_client.send_we_stopped(Box::new(ClientCoreError::GatewayFailedToForwardMessages));
break;
}
}
},
None => {
tracing::trace!("MixTrafficController: Stopping since channel closed");
break;
}
},
client_request = self.client_rx.recv() => match client_request {
Some(client_request) => {
self.on_client_request(client_request).await;
},
None => {
tracing::trace!("MixTrafficController, client request channel closed");
}
},
None => {
tracing::trace!("MixTrafficController, client request channel closed");
}
},
_ = self.task_client.recv() => {
tracing::trace!("MixTrafficController: Received shutdown");
break;
}
}
}
self.task_client.recv_timeout().await;
tracing::debug!("MixTrafficController: Exiting");
});
self.task_client.recv_timeout().await;
tracing::debug!("MixTrafficController: Exiting");
},
"MixTrafficController"
);
}
}
@@ -269,6 +269,8 @@ pub struct MockGateway {
}
impl Default for MockGateway {
// test code
#[allow(clippy::unwrap_used)]
fn default() -> Self {
MockGateway {
dummy_identity: "3ebjp1Fb9hdcS1AR6AZihgeJiMHkB5jjJUsvqNnfQwU7"
@@ -194,10 +194,11 @@ impl ActionController {
trace!("{frag_id} is updating its delay");
// TODO: is it possible to solve this without either locking or temporarily removing the value?
if let Some((pending_ack_data, queue_key)) = self.pending_acks_data.remove(&frag_id) {
// this Action is triggered by `RetransmissionRequestListener` (for 'normal' packets)
// SAFETY: this Action is triggered by `RetransmissionRequestListener` (for 'normal' packets)
// or `ReplyController` (for 'reply' packets) which held the other potential
// reference to this Arc. HOWEVER, before the Action was pushed onto the queue, the reference
// was dropped hence this unwrap is safe.
#[allow(clippy::unwrap_used)]
let mut inner_data = Arc::try_unwrap(pending_ack_data).unwrap();
inner_data.update_retransmitted(delay);
@@ -209,6 +210,7 @@ impl ActionController {
}
// note: when the entry expires it's automatically removed from pending_acks_timers
#[allow(clippy::panic)]
fn handle_expired_ack_timer(&mut self, expired_ack: Expired<FragmentIdentifier>) {
let frag_id = expired_ack.into_inner();
@@ -120,6 +120,7 @@ where
}
}
#[allow(clippy::panic)]
async fn on_input_message(&mut self, msg: InputMessage) {
match msg {
InputMessage::Regular {
@@ -213,7 +214,9 @@ where
self.handle_premade_packets(msgs, lane).await
}
// MessageWrappers can't be nested
InputMessage::MessageWrapper { .. } => unimplemented!(),
InputMessage::MessageWrapper { .. } => {
panic!("attempted to use nested MessageWrapper")
}
},
};
}
@@ -223,6 +226,11 @@ where
while !self.task_client.is_shutdown() {
tokio::select! {
biased;
_ = self.task_client.recv() => {
tracing::trace!("InputMessageListener: Received shutdown");
break;
}
input_msg = self.input_receiver.recv() => match input_msg {
Some(input_msg) => {
self.on_input_message(input_msg).await;
@@ -232,9 +240,7 @@ where
break;
}
},
_ = self.task_client.recv() => {
tracing::trace!("InputMessageListener: Received shutdown");
}
}
}
self.task_client.recv_timeout().await;
@@ -298,29 +298,44 @@ where
let mut sent_notification_listener = self.sent_notification_listener;
let mut action_controller = self.action_controller;
spawn_future(async move {
acknowledgement_listener.run().await;
debug!("The acknowledgement listener has finished execution!");
});
spawn_future!(
async move {
acknowledgement_listener.run().await;
debug!("The acknowledgement listener has finished execution!");
},
"AcknowledgementController::AcknowledgementListener"
);
spawn_future(async move {
input_message_listener.run().await;
debug!("The input listener has finished execution!");
});
spawn_future!(
async move {
input_message_listener.run().await;
debug!("The input listener has finished execution!");
},
"AcknowledgementController::InputMessageListener"
);
spawn_future(async move {
retransmission_request_listener.run(packet_type).await;
debug!("The retransmission request listener has finished execution!");
});
spawn_future!(
async move {
retransmission_request_listener.run(packet_type).await;
debug!("The retransmission request listener has finished execution!");
},
"AcknowledgementController::RetransmissionRequestListener"
);
spawn_future(async move {
sent_notification_listener.run().await;
debug!("The sent notification listener has finished execution!");
});
spawn_future!(
async move {
sent_notification_listener.run().await;
debug!("The sent notification listener has finished execution!");
},
"AcknowledgementController::SentNotificationListener"
);
spawn_future(async move {
action_controller.run().await;
debug!("The controller has finished execution!");
});
spawn_future!(
async move {
action_controller.run().await;
debug!("The controller has finished execution!");
},
"AcknowledgementController::ActionController"
);
}
}
@@ -179,6 +179,11 @@ where
while !self.task_client.is_shutdown() {
tokio::select! {
biased;
_ = self.task_client.recv() => {
tracing::trace!("RetransmissionRequestListener: Received shutdown");
break;
}
timed_out_ack = self.request_receiver.next() => match timed_out_ack {
Some(timed_out_ack) => self.on_retransmission_request(timed_out_ack, packet_type).await,
None => {
@@ -186,9 +191,7 @@ where
break;
}
},
_ = self.task_client.recv() => {
tracing::trace!("RetransmissionRequestListener: Received shutdown");
}
}
}
self.task_client.recv_timeout().await;
@@ -35,6 +35,9 @@ pub enum PreparationError {
#[error(transparent)]
NymTopologyError(#[from] NymTopologyError),
#[error("message wasn't split into any fragments!")]
EmptyFragments,
#[error("message too long for a single SURB, splitting into {fragments} fragments.")]
MessageTooLongForSingleSurb { fragments: usize },
@@ -320,6 +323,16 @@ where
});
}
if fragment.is_empty() {
error!("CRITICAL FAILURE: our split message didn't result in any sendable fragments");
return Err(SurbWrappedPreparationError {
source: PreparationError::EmptyFragments,
returned_surbs: Some(vec![reply_surb]),
});
}
// SAFETY: we just checked we have one fragment
#[allow(clippy::unwrap_used)]
let chunk = fragment.pop().unwrap();
let chunk_clone = chunk.clone();
let prepared_fragment = self
@@ -535,6 +548,7 @@ where
pending_acks.push(pending_ack);
}
drop(topology_permit);
self.insert_pending_acks(pending_acks);
self.forward_messages(real_messages, lane).await;
@@ -657,6 +671,7 @@ where
.zip(reply_surbs.into_iter())
.map(|(fragment, reply_surb)| {
// unwrap here is fine as we know we have a valid topology
#[allow(clippy::unwrap_used)]
self.message_preparer
.prepare_reply_chunk_for_sending(
fragment,
@@ -716,17 +731,21 @@ where
// tells real message sender (with the poisson timer) to send this to the mix network
pub(crate) async fn forward_messages(
&self,
&mut self,
messages: Vec<RealMessage>,
transmission_lane: TransmissionLane,
) {
if let Err(err) = self
.real_message_sender
.send((messages, transmission_lane))
.await
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to forward messages to the real message sender: {err}");
tokio::select! {
biased;
_ = self.task_client.recv() => {
trace!("received shutdown while attempting to forward mixnet messages");
}
sending_res = self.real_message_sender.send((messages, transmission_lane)) => {
if sending_res.is_err() {
error!(
"failed to forward mixnet messages due to closed channel (outside of shutdown!)"
);
}
}
}
}
@@ -224,14 +224,20 @@ impl RealMessagesController<OsRng> {
let ack_control = self.ack_control;
let mut reply_control = self.reply_control;
spawn_future(async move {
out_queue_control.run().await;
debug!("The out queue controller has finished execution!");
});
spawn_future(async move {
reply_control.run().await;
debug!("The reply controller has finished execution!");
});
spawn_future!(
async move {
out_queue_control.run().await;
debug!("The out queue controller has finished execution!");
},
"RealMessagesController::OutQueueControl)"
);
spawn_future!(
async move {
reply_control.run().await;
debug!("The reply controller has finished execution!");
},
"RealMessagesController::ReplyController"
);
ack_control.start(packet_type);
}
@@ -249,6 +249,8 @@ where
}
};
// SAFETY: our topology must be valid at this point
#[allow(clippy::expect_used)]
(
generate_loop_cover_packet(
&mut self.rng,
@@ -278,17 +280,33 @@ where
}
};
if let Err(err) = self.mix_tx.send(vec![next_message]).await {
if !self.task_client.is_shutdown_poll() {
tracing::error!("Failed to send: {err}");
let sending_res = tokio::select! {
biased;
_ = self.task_client.recv() => {
trace!("received shutdown signal while attempting to send mix message");
return
}
sending_res = self.mix_tx.send(vec![next_message]) => {
sending_res
}
};
match sending_res {
Err(_) => {
if !self.task_client.is_shutdown_poll() {
tracing::error!(
"failed to send mixnet packet due to closed channel (outside of shutdown!)"
);
}
}
Ok(_) => {
let event = if fragment_id.is_some() {
PacketStatisticsEvent::RealPacketSent(packet_size)
} else {
PacketStatisticsEvent::CoverPacketSent(packet_size)
};
self.stats_tx.report(event.into());
}
} else {
let event = if fragment_id.is_some() {
PacketStatisticsEvent::RealPacketSent(packet_size)
} else {
PacketStatisticsEvent::CoverPacketSent(packet_size)
};
self.stats_tx.report(event.into());
}
// notify ack controller about sending our message only after we actually managed to push it
@@ -439,6 +457,8 @@ where
tracing::trace!("handling real_messages: size: {}", real_messages.len());
self.transmission_buffer.store(&conn_id, real_messages);
// SAFETY: we just stored the message
#[allow(clippy::expect_used)]
let real_next = self.pop_next_message().expect("Just stored one");
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
@@ -487,6 +507,8 @@ where
// First store what we got for the given connection id
self.transmission_buffer.store(&conn_id, real_messages);
// SAFETY: we just stored the message
#[allow(clippy::expect_used)]
let real_next = self.pop_next_message().expect("we just added one");
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
@@ -198,6 +198,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
}
}
#[allow(clippy::panic)]
async fn disconnect_sender(&mut self) {
let mut guard = self.inner.lock().await;
if guard.message_sender.is_none() {
@@ -208,6 +209,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
guard.message_sender = None;
}
#[allow(clippy::panic)]
async fn connect_sender(&mut self, sender: ReconstructedMessagesSender) {
let mut guard = self.inner.lock().await;
if guard.message_sender.is_some() {
@@ -599,14 +601,20 @@ impl<R: MessageReceiver + Clone + Send + 'static> ReceivedMessagesBufferControll
let mut fragmented_message_receiver = self.fragmented_message_receiver;
let mut request_receiver = self.request_receiver;
spawn_future(async move {
match fragmented_message_receiver.run().await {
Ok(_) => {}
Err(e) => error!("{e}"),
}
});
spawn_future(async move {
request_receiver.run().await;
});
spawn_future!(
async move {
match fragmented_message_receiver.run().await {
Ok(_) => {}
Err(e) => error!("{e}"),
}
},
"ReceivedMessagesBufferController::FragmentedMessageReceiver"
);
spawn_future!(
async move {
request_receiver.run().await;
},
"ReceivedMessagesBufferController::RequestReceiver"
);
}
}
@@ -155,8 +155,9 @@ where
data: Vec<Arc<PendingAcknowledgement>>,
) {
trace!("re-inserting pending retransmissions for {recipient}");
// the underlying entry MUST exist as we've just got data from there
// SAFETY: the underlying entry MUST exist as we've just got data from there
// and we hold a mut reference
#[allow(clippy::expect_used)]
let map_entry = &mut self
.surb_senders
.get_mut(recipient)
@@ -429,6 +430,7 @@ where
.pop_at_most_n_next_messages_at_random(amount)
}
#[allow(clippy::panic)]
async fn try_clear_pending_queue(&mut self, target: AnonymousSenderTag) {
trace!("trying to clear pending queue");
let available_surbs = self.surbs_storage.available_surbs(&target);
@@ -165,9 +165,12 @@ impl StatisticsControl {
}
pub(crate) fn start(mut self) {
spawn_future(async move {
self.run().await;
})
spawn_future!(
async move {
self.run().await;
},
"StatisticsControl"
)
}
pub(crate) fn create_and_start(
@@ -126,7 +126,7 @@ impl TopologyAccessor {
.map(|p| p.topology.clone())
}
pub async fn current_route_provider(&self) -> Option<RwLockReadGuard<NymRouteProvider>> {
pub async fn current_route_provider(&self) -> Option<RwLockReadGuard<'_, NymRouteProvider>> {
let provider = self.inner.topology.read().await;
if provider.topology.is_empty() {
None
@@ -145,36 +145,39 @@ impl TopologyRefresher {
}
pub fn start(mut self) {
spawn_future(async move {
debug!("Started TopologyRefresher with graceful shutdown support");
spawn_future!(
async move {
debug!("Started TopologyRefresher with graceful shutdown support");
#[cfg(not(target_arch = "wasm32"))]
let mut interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
self.refresh_rate,
));
#[cfg(not(target_arch = "wasm32"))]
let mut interval = tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval(self.refresh_rate),
);
#[cfg(target_arch = "wasm32")]
let mut interval =
gloo_timers::future::IntervalStream::new(self.refresh_rate.as_millis() as u32);
#[cfg(target_arch = "wasm32")]
let mut interval =
gloo_timers::future::IntervalStream::new(self.refresh_rate.as_millis() as u32);
// We already have an initial topology, so no need to refresh it immediately.
// My understanding is that js setInterval does not fire immediately, so it's not
// needed there.
#[cfg(not(target_arch = "wasm32"))]
interval.next().await;
// We already have an initial topology, so no need to refresh it immediately.
// My understanding is that js setInterval does not fire immediately, so it's not
// needed there.
#[cfg(not(target_arch = "wasm32"))]
interval.next().await;
while !self.task_client.is_shutdown() {
tokio::select! {
_ = interval.next() => {
self.try_refresh().await;
},
_ = self.task_client.recv() => {
tracing::trace!("TopologyRefresher: Received shutdown");
},
while !self.task_client.is_shutdown() {
tokio::select! {
_ = interval.next() => {
self.try_refresh().await;
},
_ = self.task_client.recv() => {
tracing::trace!("TopologyRefresher: Received shutdown");
},
}
}
}
self.task_client.recv_timeout().await;
tracing::debug!("TopologyRefresher: Exiting");
})
self.task_client.recv_timeout().await;
tracing::debug!("TopologyRefresher: Exiting");
},
"TopologyRefresher"
)
}
}
+15 -1
View File
@@ -7,7 +7,9 @@ use nym_gateway_client::error::GatewayClientError;
use nym_topology::node::RoutingNodeError;
use nym_topology::{NodeId, NymTopologyError};
use nym_validator_client::nym_api::error::NymAPIError;
use nym_validator_client::nyxd::error::NyxdError;
use nym_validator_client::ValidatorClientError;
use rand::distributions::WeightedError;
use std::error::Error;
use std::path::PathBuf;
@@ -230,7 +232,19 @@ pub enum ClientCoreError {
UnexpectedKeyUpgrade { gateway_id: String },
#[error("failed to derive keys from master key")]
HkdfDerivationError {},
HkdfDerivationError,
#[error("missing url for constructing RPC client")]
RpcClientMissingUrl,
#[error("provided nym network details were malformed: {source}")]
InvalidNetworkDetails { source: NyxdError },
#[error("failed to construct RPC client: {source}")]
RpcClientCreationFailure { source: NyxdError },
#[error("failed to select valid gateway due to incomputable latency")]
GatewaySelectionFailure { source: WeightedError },
}
impl From<tungstenite::Error> for ClientCoreError {
+2 -2
View File
@@ -148,7 +148,7 @@ async fn connect(endpoint: &str) -> Result<WsConn, ClientCoreError> {
JSWebsocket::new(endpoint).map_err(|_| ClientCoreError::GatewayJsConnectionFailure)
}
async fn measure_latency<G>(gateway: &G) -> Result<GatewayWithLatency<G>, ClientCoreError>
async fn measure_latency<G>(gateway: &G) -> Result<GatewayWithLatency<'_, G>, ClientCoreError>
where
G: ConnectableGateway,
{
@@ -245,7 +245,7 @@ pub async fn choose_gateway_by_latency<R: Rng, G: ConnectableGateway + Clone>(
let gateways_with_latency = gateways_with_latency.lock().await;
let chosen = gateways_with_latency
.choose_weighted(rng, |item| 1. / item.latency.as_secs_f32())
.expect("invalid selection weight!");
.map_err(|source| ClientCoreError::GatewaySelectionFailure { source })?;
info!(
"chose gateway {} with average latency of {:?}",
+38 -2
View File
@@ -18,18 +18,54 @@ pub use nym_topology::{
};
#[cfg(target_arch = "wasm32")]
pub(crate) fn spawn_future<F>(future: F)
pub fn spawn_future<F>(future: F)
where
F: Future<Output = ()> + 'static,
{
wasm_bindgen_futures::spawn_local(future);
}
// TODO: expose similar API to the rest of the codebase,
// perhaps with some simple trait for a task to define its name
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn spawn_future<F>(future: F)
#[track_caller]
pub fn spawn_future<F>(future: F)
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
tokio::spawn(future);
}
#[cfg(not(target_arch = "wasm32"))]
#[track_caller]
pub fn spawn_named_future<F>(future: F, name: &str)
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
cfg_if::cfg_if! {if #[cfg(tokio_unstable)] {
#[allow(clippy::expect_used)]
tokio::task::Builder::new().name(name).spawn(future).expect("failed to spawn future");
} else {
let _ = name;
tracing::debug!(r#"the underlying binary hasn't been built with `RUSTFLAGS="--cfg tokio_unstable"` - the future naming won't do anything"#);
spawn_future(future);
}}
}
#[macro_export]
macro_rules! spawn_future {
($future:expr) => {{
$crate::spawn_future($future)
}};
($future:expr, $name:expr) => {{
cfg_if::cfg_if! {if #[cfg(not(target_arch = "wasm32"))] {
$crate::spawn_named_future($future, $name)
} else {
let _ = $name;
$crate::spawn_future($future)
}}
}};
}
@@ -201,7 +201,7 @@ impl<C, St> GatewayClient<C, St> {
#[cfg(not(target_arch = "wasm32"))]
pub async fn establish_connection(&mut self) -> Result<(), GatewayClientError> {
debug!(
"Attemting to establish connection to gateway at: {}",
"Attempting to establish connection to gateway at: {}",
self.gateway_address
);
let (ws_stream, _) = connect_async(
@@ -337,7 +337,7 @@ impl PartiallyDelegatedHandle {
// check if the split stream didn't error out
let receive_res = stream_receiver
.try_recv()
.expect("stream sender was somehow dropped without sending anything!");
.map_err(|_| GatewayClientError::ConnectionAbruptlyClosed)?;
if let Some(res) = receive_res {
let _res = res?;
@@ -719,10 +719,11 @@ impl NymApiClient {
pub async fn partial_expiration_date_signatures(
&self,
expiration_date: Option<Date>,
epoch_id: Option<EpochId>,
) -> Result<PartialExpirationDateSignatureResponse, ValidatorClientError> {
Ok(self
.nym_api
.partial_expiration_date_signatures(expiration_date)
.partial_expiration_date_signatures(expiration_date, epoch_id)
.await?)
}
@@ -739,10 +740,11 @@ impl NymApiClient {
pub async fn global_expiration_date_signatures(
&self,
expiration_date: Option<Date>,
epoch_id: Option<EpochId>,
) -> Result<AggregatedExpirationDateSignatureResponse, ValidatorClientError> {
Ok(self
.nym_api
.global_expiration_date_signatures(expiration_date)
.global_expiration_date_signatures(expiration_date, epoch_id)
.await?)
}
@@ -6,16 +6,18 @@ use crate::nym_api::routes::{ecash, CORE_STATUS_COUNT, SINCE_ARG};
use async_trait::async_trait;
use nym_api_requests::ecash::models::{
AggregatedCoinIndicesSignatureResponse, AggregatedExpirationDateSignatureResponse,
BatchRedeemTicketsBody, EcashBatchTicketRedemptionResponse, EcashTicketVerificationResponse,
IssuedTicketbooksChallengeCommitmentRequest, IssuedTicketbooksChallengeCommitmentResponse,
IssuedTicketbooksDataRequest, IssuedTicketbooksDataResponse, IssuedTicketbooksForCountResponse,
IssuedTicketbooksForResponse, VerifyEcashTicketBody,
BatchRedeemTicketsBody, EcashBatchTicketRedemptionResponse, EcashSignerStatusResponse,
EcashTicketVerificationResponse, IssuedTicketbooksChallengeCommitmentRequest,
IssuedTicketbooksChallengeCommitmentResponse, IssuedTicketbooksDataRequest,
IssuedTicketbooksDataResponse, IssuedTicketbooksForCountResponse, IssuedTicketbooksForResponse,
VerifyEcashTicketBody,
};
use nym_api_requests::ecash::VerificationKeyResponse;
use nym_api_requests::models::{
AnnotationResponse, ApiHealthResponse, BinaryBuildInformationOwned, ChainStatusResponse,
KeyRotationInfoResponse, LegacyDescribedMixNode, NodePerformanceResponse, NodeRefreshBody,
NymNodeDescription, PerformanceHistoryResponse, RewardedSetResponse,
AnnotationResponse, ApiHealthResponse, BinaryBuildInformationOwned, ChainBlocksStatusResponse,
ChainStatusResponse, KeyRotationInfoResponse, LegacyDescribedMixNode, NodePerformanceResponse,
NodeRefreshBody, NymNodeDescription, PerformanceHistoryResponse, RewardedSetResponse,
SignerInformationResponse,
};
use nym_api_requests::nym_nodes::{
NodesByAddressesRequestBody, NodesByAddressesResponse, PaginatedCachedNodesResponseV1,
@@ -1101,8 +1103,9 @@ pub trait NymApiClientExt: ApiClient {
async fn partial_expiration_date_signatures(
&self,
expiration_date: Option<Date>,
epoch_id: Option<EpochId>,
) -> Result<PartialExpirationDateSignatureResponse, NymAPIError> {
let params = match expiration_date {
let mut params = match expiration_date {
None => Vec::new(),
Some(exp) => vec![(
ecash::EXPIRATION_DATE_PARAM,
@@ -1110,6 +1113,10 @@ pub trait NymApiClientExt: ApiClient {
)],
};
if let Some(epoch_id) = epoch_id {
params.push((ecash::EPOCH_ID_PARAM, epoch_id.to_string()));
}
self.get_json(
&[
routes::V1_API_VERSION,
@@ -1146,8 +1153,9 @@ pub trait NymApiClientExt: ApiClient {
async fn global_expiration_date_signatures(
&self,
expiration_date: Option<Date>,
epoch_id: Option<EpochId>,
) -> Result<AggregatedExpirationDateSignatureResponse, NymAPIError> {
let params = match expiration_date {
let mut params = match expiration_date {
None => Vec::new(),
Some(exp) => vec![(
ecash::EXPIRATION_DATE_PARAM,
@@ -1155,6 +1163,10 @@ pub trait NymApiClientExt: ApiClient {
)],
};
if let Some(epoch_id) = epoch_id {
params.push((ecash::EPOCH_ID_PARAM, epoch_id.to_string()));
}
self.get_json(
&[
routes::V1_API_VERSION,
@@ -1331,6 +1343,22 @@ pub trait NymApiClientExt: ApiClient {
.await
}
async fn get_chain_blocks_status(&self) -> Result<ChainBlocksStatusResponse, NymAPIError> {
self.get_json("/v1/network/chain-blocks-status", NO_PARAMS)
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_signer_status(&self) -> Result<EcashSignerStatusResponse, NymAPIError> {
self.get_json("/v1/ecash/signer-status", NO_PARAMS).await
}
#[instrument(level = "debug", skip(self))]
async fn get_signer_information(&self) -> Result<SignerInformationResponse, NymAPIError> {
self.get_json("/v1/api-status/signer-information", NO_PARAMS)
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_key_rotation_info(&self) -> Result<KeyRotationInfoResponse, NymAPIError> {
self.get_json(
@@ -8,11 +8,11 @@ use crate::nyxd::CosmWasmClient;
use async_trait::async_trait;
use cosmrs::AccountId;
use cosmwasm_std::Addr;
use nym_coconut_dkg_common::dealer::RegisteredDealerDetails;
use nym_coconut_dkg_common::types::{ChunkIndex, NodeIndex, StateAdvanceResponse};
use serde::Deserialize;
use tracing::trace;
use nym_coconut_dkg_common::dealer::RegisteredDealerDetails;
pub use nym_coconut_dkg_common::{
dealer::{DealerDetailsResponse, PagedDealerIndexResponse, PagedDealerResponse},
dealing::{
@@ -21,7 +21,9 @@ pub use nym_coconut_dkg_common::{
},
msg::QueryMsg as DkgQueryMsg,
types::{DealerDetails, DealingIndex, Epoch, EpochId, EpochState, State},
verification_key::{ContractVKShare, PagedVKSharesResponse, VkShareResponse},
verification_key::{
ContractVKShare, PagedVKSharesResponse, VerificationKeyShare, VkShareResponse,
},
};
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -139,12 +139,22 @@ impl NyxdClient<HttpClient> {
})
}
pub fn connect_with_network_details<U>(
endpoint: U,
network_details: NymNetworkDetails,
) -> Result<QueryHttpRpcNyxdClient, NyxdError>
where
U: TryInto<HttpClientUrl, Error = TendermintRpcError>,
{
let config = Config::try_from_nym_network_details(&network_details)?;
Self::connect(config, endpoint)
}
pub fn connect_to_default_env<U>(endpoint: U) -> Result<QueryHttpRpcNyxdClient, NyxdError>
where
U: TryInto<HttpClientUrl, Error = TendermintRpcError>,
{
let config = Config::try_from_nym_network_details(&NymNetworkDetails::new_from_env())?;
Self::connect(config, endpoint)
Self::connect_with_network_details(endpoint, NymNetworkDetails::new_from_env())
}
}
+1 -1
View File
@@ -86,7 +86,7 @@ pub async fn execute(args: Args) -> anyhow::Result<()> {
anyhow!("ticketbook got incorrectly imported - the master verification key is missing")
})?;
let expiration_signatures = persistent_storage
.get_expiration_date_signatures(expiration_date)
.get_expiration_date_signatures(expiration_date, epoch_id)
.await?
.ok_or_else(|| {
anyhow!(
@@ -120,7 +120,7 @@ async fn issue_to_file(args: Args, client: SigningClient) -> anyhow::Result<()>
if args.include_expiration_date_signatures {
let signatures = credentials_store
.get_expiration_date_signatures(expiration_date)
.get_expiration_date_signatures(expiration_date, epoch_id)
.await?
.ok_or(anyhow!("missing expiration date signatures!"))?;
@@ -241,7 +241,7 @@ pub async fn delegate_to_multiple_mixnodes(args: Args, client: SigningClient) {
let node_id = row.node_id.clone().parse::<u32>().unwrap();
let coins: Vec<Coin> = vec![];
undelegation_msgs.push((ExecuteMsg::Undelegate { node_id }, coins));
undelegation_table.add_row(&[row.node_id.clone()]);
undelegation_table.add_row(std::slice::from_ref(&row.node_id));
if row.amount.amount > 0 {
delegation_msgs
@@ -188,7 +188,7 @@ impl<C> ContractTesterBuilder<C> {
*self.app.api()
}
pub fn querier(&self) -> QuerierWrapper {
pub fn querier(&self) -> QuerierWrapper<'_> {
self.app.wrap()
}
}
@@ -57,7 +57,7 @@ pub trait NodeBond {
fn is_unbonding(&self) -> bool;
fn identity(&self) -> IdentityKeyRef;
fn identity(&self) -> IdentityKeyRef<'_>;
fn original_pledge(&self) -> &Coin;
@@ -125,7 +125,7 @@ impl NodeBond for MixNodeBond {
self.is_unbonding
}
fn identity(&self) -> IdentityKeyRef {
fn identity(&self) -> IdentityKeyRef<'_> {
self.identity()
}
@@ -178,7 +178,7 @@ impl NodeBond for NymNodeBond {
self.is_unbonding
}
fn identity(&self) -> IdentityKeyRef {
fn identity(&self) -> IdentityKeyRef<'_> {
self.identity()
}
@@ -58,7 +58,7 @@ impl<'a> PrimaryKey<'a> for Role {
type Suffix = <u8 as PrimaryKey<'a>>::Suffix;
type SuperSuffix = <u8 as PrimaryKey<'a>>::SuperSuffix;
fn key(&self) -> Vec<Key> {
fn key(&self) -> Vec<Key<'_>> {
// I'm not sure why it wasn't possible to delegate the call to
// `(*self as u8).key()` directly...
// I guess because of the `Key::Ref(&'a [u8])` variant?
@@ -0,0 +1,123 @@
/*
* Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
* SPDX-License-Identifier: Apache-2.0
*/
-- 1. add temporary `epoch_id` column
ALTER TABLE pending_issuance
ADD COLUMN epoch_id INTEGER;
-- 2. populate the value
UPDATE pending_issuance
SET epoch_id = (SELECT epoch_id
FROM expiration_date_signatures
WHERE expiration_date_signatures.expiration_date = pending_issuance.expiration_date);
-- 3. create new expiration_date_signatures table (with changed constraints)
CREATE TABLE expiration_date_signatures_new
(
expiration_date DATE NOT NULL,
epoch_id INTEGER NOT NULL,
serialization_revision INTEGER NOT NULL,
-- combined signatures for all tuples issued for given day
serialised_signatures BLOB NOT NULL,
PRIMARY KEY (epoch_id, expiration_date)
);
-- 4. migrate the data
INSERT INTO expiration_date_signatures_new (expiration_date, epoch_id, serialization_revision, serialised_signatures)
SELECT expiration_date, epoch_id, serialization_revision, serialised_signatures
FROM expiration_date_signatures;
-- 5. drop and recreate the table references (due to new FK)
-- 5.1.
-- (data for ticketbooks that have an associated deposit, but failed to get issued)
CREATE TABLE pending_issuance_new
(
deposit_id INTEGER NOT NULL PRIMARY KEY,
-- introduce a way for us to introduce breaking changes in serialization of data
serialization_revision INTEGER NOT NULL,
pending_ticketbook_data BLOB NOT NULL UNIQUE,
-- for each ticketbook we MUST have corresponding expiration date signatures
expiration_date DATE NOT NULL,
epoch_id INTEGER NOT NULL,
-- for each ticketbook we MUST have corresponding expiration date signatures
FOREIGN KEY (epoch_id, expiration_date) REFERENCES expiration_date_signatures_new (epoch_id, expiration_date)
);
INSERT INTO pending_issuance_new (deposit_id, serialization_revision, pending_ticketbook_data, expiration_date,
epoch_id)
SELECT deposit_id, serialization_revision, pending_ticketbook_data, expiration_date, epoch_id
FROM pending_issuance;
-- 5.2.
CREATE TABLE ecash_ticketbook_new
(
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
-- introduce a way for us to introduce breaking changes in serialization of data
serialization_revision INTEGER NOT NULL,
-- the type of the associated ticketbook
ticketbook_type TEXT NOT NULL,
-- the actual crypto data of the ticketbook (wallet, keys, etc.)
ticketbook_data BLOB NOT NULL UNIQUE,
-- for each ticketbook we MUST have corresponding expiration date signatures
expiration_date DATE NOT NULL,
-- for each ticketbook we MUST have corresponding coin index signatures
epoch_id INTEGER NOT NULL,
-- the initial number of tickets the wallet has been created for
total_tickets INTEGER NOT NULL,
-- how many tickets have been used so far (the `l` value of the wallet)
used_tickets INTEGER NOT NULL,
-- FOREIGN KEYS:
-- for each ticketbook we MUST have corresponding coin index signatures
FOREIGN KEY (epoch_id) REFERENCES coin_indices_signatures (epoch_id),
-- for each ticketbook we MUST have corresponding expiration date signatures
FOREIGN KEY (expiration_date, epoch_id) REFERENCES expiration_date_signatures_new (expiration_date, epoch_id)
);
INSERT INTO ecash_ticketbook_new (id, serialization_revision, ticketbook_type, ticketbook_data, expiration_date,
epoch_id, total_tickets, used_tickets)
SELECT id,
serialization_revision,
ticketbook_type,
ticketbook_data,
expiration_date,
epoch_id,
total_tickets,
used_tickets
FROM ecash_ticketbook;
-- 6. finally swap out the old tables
-- drop old tables
DROP TABLE expiration_date_signatures;
DROP TABLE pending_issuance;
DROP TABLE ecash_ticketbook;
-- rename new tables
ALTER TABLE expiration_date_signatures_new
RENAME TO expiration_date_signatures;
ALTER TABLE pending_issuance_new
RENAME TO pending_issuance;
ALTER TABLE ecash_ticketbook_new
RENAME TO ecash_ticketbook;
@@ -28,7 +28,7 @@ struct EcashCredentialManagerInner {
pending: HashMap<i64, RetrievedPendingTicketbook>,
master_vk: HashMap<u64, VerificationKeyAuth>,
coin_indices_sigs: HashMap<u64, Vec<AnnotatedCoinIndexSignature>>,
expiration_date_sigs: HashMap<Date, Vec<AnnotatedExpirationDateSignature>>,
expiration_date_sigs: HashMap<(u64, Date), Vec<AnnotatedExpirationDateSignature>>,
_next_id: i64,
}
@@ -242,10 +242,14 @@ impl MemoryEcachTicketbookManager {
pub(crate) async fn get_expiration_date_signatures(
&self,
expiration_date: Date,
epoch_id: u64,
) -> Option<Vec<AnnotatedExpirationDateSignature>> {
let guard = self.inner.read().await;
guard.expiration_date_sigs.get(&expiration_date).cloned()
guard
.expiration_date_sigs
.get(&(epoch_id, expiration_date))
.cloned()
}
pub(crate) async fn insert_expiration_date_signatures(
@@ -254,8 +258,9 @@ impl MemoryEcachTicketbookManager {
) {
let mut guard = self.inner.write().await;
guard
.expiration_date_sigs
.insert(sigs.expiration_date, sigs.signatures.clone());
guard.expiration_date_sigs.insert(
(sigs.epoch_id, sigs.expiration_date),
sigs.signatures.clone(),
);
}
}
@@ -39,7 +39,7 @@ impl SqliteEcashTicketbookManager {
Ok(())
}
pub(crate) async fn begin_storage_tx(&self) -> Result<Transaction<Sqlite>, sqlx::Error> {
pub(crate) async fn begin_storage_tx(&self) -> Result<Transaction<'_, Sqlite>, sqlx::Error> {
self.connection_pool.begin().await
}
@@ -260,15 +260,17 @@ impl SqliteEcashTicketbookManager {
pub(crate) async fn get_expiration_date_signatures(
&self,
expiration_date: Date,
epoch_id: i64,
) -> Result<Option<RawExpirationDateSignatures>, sqlx::Error> {
sqlx::query_as!(
RawExpirationDateSignatures,
r#"
SELECT epoch_id as "epoch_id: u32", serialised_signatures, serialization_revision as "serialization_revision: u8"
SELECT serialised_signatures, serialization_revision as "serialization_revision: u8"
FROM expiration_date_signatures
WHERE expiration_date = ?
WHERE expiration_date = ? AND epoch_id = ?
"#,
expiration_date
expiration_date,
epoch_id
)
.fetch_optional(&*self.connection_pool)
.await
@@ -166,10 +166,11 @@ impl Storage for EphemeralStorage {
async fn get_expiration_date_signatures(
&self,
expiration_date: Date,
epoch_id: u64,
) -> Result<Option<Vec<AnnotatedExpirationDateSignature>>, Self::StorageError> {
Ok(self
.storage_manager
.get_expiration_date_signatures(expiration_date)
.get_expiration_date_signatures(expiration_date, epoch_id)
.await)
}
-1
View File
@@ -60,7 +60,6 @@ pub struct StoredPendingTicketbook {
#[cfg_attr(not(target_arch = "wasm32"), derive(sqlx::FromRow))]
pub struct RawExpirationDateSignatures {
pub epoch_id: u32,
pub serialised_signatures: Vec<u8>,
pub serialization_revision: u8,
}
@@ -325,10 +325,11 @@ impl Storage for PersistentStorage {
async fn get_expiration_date_signatures(
&self,
expiration_date: Date,
epoch_id: u64,
) -> Result<Option<Vec<AnnotatedExpirationDateSignature>>, Self::StorageError> {
let Some(raw) = self
.storage_manager
.get_expiration_date_signatures(expiration_date)
.get_expiration_date_signatures(expiration_date, epoch_id as i64)
.await?
else {
return Ok(None);
+1
View File
@@ -92,6 +92,7 @@ pub trait Storage: Clone + Send + Sync {
async fn get_expiration_date_signatures(
&self,
expiration_date: Date,
epoch_id: u64,
) -> Result<Option<Vec<AnnotatedExpirationDateSignature>>, Self::StorageError>;
async fn insert_expiration_date_signatures(
@@ -39,7 +39,7 @@ impl traits::EcashManager for EcashManager {
async fn verification_key(
&self,
epoch_id: EpochId,
) -> Result<RwLockReadGuard<VerificationKeyAuth>, EcashTicketError> {
) -> Result<RwLockReadGuard<'_, VerificationKeyAuth>, EcashTicketError> {
self.shared_state.verification_key(epoch_id).await
}
@@ -231,7 +231,7 @@ impl traits::EcashManager for MockEcashManager {
async fn verification_key(
&self,
_epoch_id: EpochId,
) -> Result<RwLockReadGuard<VerificationKeyAuth>, EcashTicketError> {
) -> Result<RwLockReadGuard<'_, VerificationKeyAuth>, EcashTicketError> {
Ok(self.verfication_key.read().await)
}
@@ -125,7 +125,7 @@ impl SharedState {
async fn set_epoch_data(
&self,
epoch_id: EpochId,
) -> Result<RwLockWriteGuard<BTreeMap<EpochId, EpochState>>, EcashTicketError> {
) -> Result<RwLockWriteGuard<'_, BTreeMap<EpochId, EpochState>>, EcashTicketError> {
let Some(threshold) = self.threshold(epoch_id).await? else {
return Err(EcashTicketError::DKGThresholdUnavailable { epoch_id });
};
@@ -186,7 +186,7 @@ impl SharedState {
pub(crate) async fn api_clients(
&self,
epoch_id: EpochId,
) -> Result<RwLockReadGuard<Vec<EcashApiClient>>, EcashTicketError> {
) -> Result<RwLockReadGuard<'_, Vec<EcashApiClient>>, EcashTicketError> {
let guard = self.epoch_data.read().await;
// the key was already in the map
@@ -212,7 +212,7 @@ impl SharedState {
pub(crate) async fn verification_key(
&self,
epoch_id: EpochId,
) -> Result<RwLockReadGuard<VerificationKeyAuth>, EcashTicketError> {
) -> Result<RwLockReadGuard<'_, VerificationKeyAuth>, EcashTicketError> {
let guard = self.epoch_data.read().await;
// the key was already in the map
@@ -235,11 +235,11 @@ impl SharedState {
}))
}
pub(crate) async fn start_tx(&self) -> RwLockWriteGuard<DirectSigningHttpRpcNyxdClient> {
pub(crate) async fn start_tx(&self) -> RwLockWriteGuard<'_, DirectSigningHttpRpcNyxdClient> {
self.nyxd_client.write().await
}
pub(crate) async fn start_query(&self) -> RwLockReadGuard<DirectSigningHttpRpcNyxdClient> {
pub(crate) async fn start_query(&self) -> RwLockReadGuard<'_, DirectSigningHttpRpcNyxdClient> {
self.nyxd_client.read().await
}
@@ -12,7 +12,7 @@ pub trait EcashManager {
async fn verification_key(
&self,
epoch_id: EpochId,
) -> Result<RwLockReadGuard<VerificationKeyAuth>, EcashTicketError>;
) -> Result<RwLockReadGuard<'_, VerificationKeyAuth>, EcashTicketError>;
fn storage(&self) -> Box<dyn BandwidthGatewayStorage + Send + Sync>;
async fn check_payment(
&self,
+1
View File
@@ -15,6 +15,7 @@ bls12_381 = { workspace = true, default-features = false }
serde = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
strum = { workspace = true, features = ["derive"] }
strum_macros = { workspace = true }
time = { workspace = true, features = ["serde"] }
utoipa = { workspace = true }
rand = { workspace = true }
+3 -3
View File
@@ -229,9 +229,9 @@ impl From<PayInfo> for NymPayInfo {
Serialize,
Deserialize,
Hash,
strum::Display,
strum::EnumString,
strum::EnumIter,
strum_macros::Display,
strum_macros::EnumString,
strum_macros::EnumIter,
)]
#[serde(rename_all = "kebab-case")]
#[strum(serialize_all = "kebab-case")]
+1 -1
View File
@@ -51,7 +51,7 @@ pub async fn obtain_expiration_date_signatures(
for ecash_api_client in ecash_api_clients.iter() {
match ecash_api_client
.api_client
.partial_expiration_date_signatures(None)
.partial_expiration_date_signatures(None, None)
.await
{
Ok(signature) => {
@@ -0,0 +1,27 @@
[package]
name = "nym-ecash-signer-check-types"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
rust-version.workspace = true
readme.workspace = true
[dependencies]
semver = { workspace = true }
serde = { workspace = true, features = ["derive"] }
url = { workspace = true }
thiserror = { workspace = true }
time = { workspace = true }
tracing = { workspace = true }
utoipa = { workspace = true }
nym-coconut-dkg-common = { path = "../cosmwasm-smart-contracts/coconut-dkg" }
nym-crypto = { path = "../crypto", features = ["asymmetric"] }
[lints]
workspace = true
@@ -0,0 +1,97 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_coconut_dkg_common::dealer::DealerDetails;
use nym_coconut_dkg_common::verification_key::{ContractVKShare, VerificationKeyShare};
use nym_crypto::asymmetric::ed25519;
use nym_crypto::asymmetric::ed25519::Ed25519RecoveryError;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use url::Url;
use utoipa::ToSchema;
#[derive(Debug, Error)]
pub enum MalformedDealer {
#[error("dealer at {dealer_url} has provided invalid ed25519 pubkey: {source}")]
InvalidDealerPubkey {
dealer_url: String,
source: Ed25519RecoveryError,
},
#[error("dealer at {dealer_url} has provided invalid announce url: {source}")]
InvalidDealerAddress {
dealer_url: String,
source: url::ParseError,
},
}
#[derive(Debug, Serialize, Deserialize, Clone, ToSchema)]
pub struct RawDealerInformation {
pub announce_address: String,
pub owner_address: String,
pub node_index: u64,
pub public_key: String,
pub verification_key_share: Option<VerificationKeyShare>,
pub share_verified: bool,
}
impl RawDealerInformation {
pub fn new(
dealer_details: &DealerDetails,
contract_share: Option<&ContractVKShare>,
) -> RawDealerInformation {
RawDealerInformation {
announce_address: dealer_details.announce_address.clone(),
owner_address: dealer_details.address.to_string(),
node_index: dealer_details.assigned_index,
public_key: dealer_details.ed25519_identity.clone(),
verification_key_share: contract_share.map(|s| s.share.clone()),
share_verified: contract_share.map(|s| s.verified).unwrap_or(false),
}
}
pub fn parse(&self) -> Result<DealerInformation, MalformedDealer> {
Ok(DealerInformation {
announce_address: self.announce_address.parse().map_err(|source| {
MalformedDealer::InvalidDealerAddress {
dealer_url: self.announce_address.clone(),
source,
}
})?,
owner_address: self.owner_address.clone(),
node_index: self.node_index,
public_key: self.public_key.parse().map_err(|source| {
MalformedDealer::InvalidDealerPubkey {
dealer_url: self.announce_address.clone(),
source,
}
})?,
verification_key_share: self.verification_key_share.clone(),
share_verified: self.share_verified,
})
}
}
#[derive(Debug)]
pub struct DealerInformation {
pub announce_address: Url,
pub owner_address: String,
pub node_index: u64,
pub public_key: ed25519::PublicKey,
// no need to parse it into the full type as it doesn't get us anything
pub verification_key_share: Option<VerificationKeyShare>,
pub share_verified: bool,
}
impl From<DealerInformation> for RawDealerInformation {
fn from(d: DealerInformation) -> Self {
RawDealerInformation {
announce_address: d.announce_address.to_string(),
owner_address: d.owner_address,
node_index: d.node_index,
public_key: d.public_key.to_base58_string(),
verification_key_share: d.verification_key_share,
share_verified: d.share_verified,
}
}
}
@@ -0,0 +1,127 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_coconut_dkg_common::types::EpochId;
use nym_coconut_dkg_common::verification_key::VerificationKeyShare;
use nym_crypto::asymmetric::ed25519;
use std::time::Duration;
use time::OffsetDateTime;
use tracing::{debug, warn};
pub trait Verifiable {
fn verify_signature(&self, pub_key: &ed25519::PublicKey) -> bool;
}
pub trait TimestampedResponse {
fn timestamp(&self) -> OffsetDateTime;
}
pub trait LegacyChainResponse {
fn chain_synced(&self, now: OffsetDateTime, stall_threshold: Duration) -> bool;
}
pub trait ChainResponse: Verifiable + TimestampedResponse {
fn chain_synced(&self) -> bool;
fn chain_available(
&self,
pub_key: &ed25519::PublicKey,
now: OffsetDateTime,
stale_response_threshold: Duration,
) -> bool {
if !self.verify_signature(pub_key) {
warn!("failed signature verification on chain status response");
return false;
}
// we rely on information provided from the api itself AS LONG AS it's not too outdated
if self.timestamp() + stale_response_threshold < now {
return false;
}
self.chain_synced()
}
}
pub trait LegacySignerResponse {
fn signer_identity(&self) -> &str;
fn signer_verification_key(&self) -> &Option<String>;
fn unprovable_signing_available(
&self,
pub_key: &ed25519::PublicKey,
expected_verification_key: Option<VerificationKeyShare>,
share_verified: bool,
) -> bool {
if self.signer_identity() != pub_key.to_base58_string() {
warn!("mismatched identity key on the legacy response");
return false;
}
// the contract share hasn't been verified yet, so we're probably in the middle of DKG
// thus if there's a bit of desync in the state, it's fine
if !share_verified {
return true;
}
if self.signer_verification_key() != &expected_verification_key {
warn!("mismatched [ecash] verification key on the legacy response");
return false;
}
true
}
}
pub trait SignerResponse: Verifiable + TimestampedResponse {
fn has_signing_keys(&self) -> bool;
fn signer_disabled(&self) -> bool;
fn is_ecash_signer(&self) -> bool;
fn dkg_ecash_epoch_id(&self) -> EpochId;
fn provable_signing_available(
&self,
pub_key: &ed25519::PublicKey,
dkg_epoch_id: EpochId,
now: OffsetDateTime,
stale_response_threshold: Duration,
) -> bool {
if !self.verify_signature(pub_key) {
warn!("failed signature verification on chain status response");
return false;
}
// we rely on information provided from the api itself AS LONG AS it's not too outdated
if self.timestamp() + stale_response_threshold < now {
return false;
}
if !self.has_signing_keys() {
debug!("missing signing keys");
return false;
}
if self.signer_disabled() {
debug!("signer functionalities explicitly disabled");
return false;
}
if !self.is_ecash_signer() {
debug!("signer doesn't recognise it's a signer for this epoch");
return false;
}
if dkg_epoch_id != self.dkg_ecash_epoch_id() {
debug!(
"mismatched dkg epoch id. current: {dkg_epoch_id}, signer's: {}",
self.dkg_ecash_epoch_id()
);
return false;
}
true
}
}
@@ -0,0 +1,6 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod dealer_information;
pub mod helper_traits;
pub mod status;
@@ -0,0 +1,303 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::dealer_information::RawDealerInformation;
use crate::helper_traits::{
ChainResponse, LegacyChainResponse, LegacySignerResponse, SignerResponse,
};
use nym_coconut_dkg_common::types::EpochId;
use nym_coconut_dkg_common::verification_key::VerificationKeyShare;
use nym_crypto::asymmetric::ed25519;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use time::OffsetDateTime;
use utoipa::ToSchema;
pub(crate) const CHAIN_STALL_THRESHOLD: Duration = Duration::from_secs(5 * 60);
pub(crate) const STALE_RESPONSE_THRESHOLD: Duration = Duration::from_secs(5 * 60);
// the reason for generics is not to remove duplication of code,
// but because without them, we'd be having problems with circular dependencies,
// i.e. nym-api-requests depending on ecash-signer-check-types and
// ecash-signer-check-types needing nym-api-requests
#[derive(Debug, Serialize, Deserialize, Clone, ToSchema)]
pub enum Status<L, T> {
/// The API, even though it reports correct version, did not response to the status query
Unreachable,
/// The API is running an outdated version that does not expose the required endpoint
Outdated,
/// Response to the legacy (unsigned) status query
ReachableLegacy { response: Box<L> },
/// Response to the current (signed) status query
Reachable { response: Box<T> },
}
impl<L, T> Status<L, T>
where
L: LegacyChainResponse,
T: ChainResponse,
{
pub fn chain_available(&self, pub_key: ed25519::PublicKey) -> bool {
let now = OffsetDateTime::now_utc();
match self {
Status::Unreachable | Status::Outdated => false,
Status::ReachableLegacy { response } => {
response.chain_synced(now, CHAIN_STALL_THRESHOLD)
}
Status::Reachable { response } => {
response.chain_available(&pub_key, now, STALE_RESPONSE_THRESHOLD)
}
}
}
pub fn chain_provably_stalled(&self, pub_key: ed25519::PublicKey) -> bool {
let now = OffsetDateTime::now_utc();
match self {
Status::Unreachable | Status::Outdated | Status::ReachableLegacy { .. } => false,
Status::Reachable { response } => {
!response.chain_available(&pub_key, now, STALE_RESPONSE_THRESHOLD)
}
}
}
pub fn chain_unprovably_stalled(&self) -> bool {
let now = OffsetDateTime::now_utc();
match self {
Status::Unreachable | Status::Outdated | Status::Reachable { .. } => false,
Status::ReachableLegacy { response } => {
!response.chain_synced(now, CHAIN_STALL_THRESHOLD)
}
}
}
}
impl<L, T> Status<L, T>
where
L: LegacySignerResponse,
T: SignerResponse,
{
pub fn signing_available(
&self,
pub_key: ed25519::PublicKey,
dkg_epoch_id: u64,
expected_verification_key: Option<VerificationKeyShare>,
share_verified: bool,
) -> bool {
let now = OffsetDateTime::now_utc();
match self {
Status::Unreachable | Status::Outdated => false,
Status::ReachableLegacy { response } => response.unprovable_signing_available(
&pub_key,
expected_verification_key,
share_verified,
),
Status::Reachable { response } => response.provable_signing_available(
&pub_key,
dkg_epoch_id,
now,
STALE_RESPONSE_THRESHOLD,
),
}
}
pub fn signing_provably_unavailable(
&self,
pub_key: ed25519::PublicKey,
dkg_epoch_id: EpochId,
) -> bool {
let now = OffsetDateTime::now_utc();
match self {
Status::Unreachable | Status::Outdated | Status::ReachableLegacy { .. } => false,
Status::Reachable { response } => !response.provable_signing_available(
&pub_key,
dkg_epoch_id,
now,
STALE_RESPONSE_THRESHOLD,
),
}
}
pub fn signing_unprovably_unavailable(
&self,
pub_key: ed25519::PublicKey,
expected_verification_key: Option<VerificationKeyShare>,
share_verified: bool,
) -> bool {
match self {
Status::Unreachable | Status::Outdated | Status::Reachable { .. } => false,
Status::ReachableLegacy { response } => !response.unprovable_signing_available(
&pub_key,
expected_verification_key,
share_verified,
),
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone, ToSchema)]
pub struct SignerResult<LS, TS, LC, TC> {
pub dkg_epoch_id: u64,
pub information: RawDealerInformation,
pub status: SignerStatus<LS, TS, LC, TC>,
}
impl<LS, TS, LC, TC> SignerResult<LS, TS, LC, TC> {
pub fn signer_unreachable(&self) -> bool {
matches!(self.status, SignerStatus::Unreachable)
}
pub fn malformed_details(&self) -> bool {
self.information.parse().is_err()
}
}
impl<LS, TS, LC, TC> SignerResult<LS, TS, LC, TC>
where
LC: LegacyChainResponse,
TC: ChainResponse,
{
pub fn unknown_chain_status(&self) -> bool {
let Ok(_) = self.information.parse() else {
return true;
};
if let SignerStatus::Tested { .. } = &self.status {
return false;
}
true
}
pub fn chain_available(&self) -> bool {
let Ok(parsed_info) = self.information.parse() else {
return false;
};
let SignerStatus::Tested { result } = &self.status else {
return false;
};
result
.local_chain_status
.chain_available(parsed_info.public_key)
}
pub fn chain_provably_stalled(&self) -> bool {
let Ok(parsed_info) = self.information.parse() else {
return false;
};
let SignerStatus::Tested { result } = &self.status else {
return false;
};
result
.local_chain_status
.chain_provably_stalled(parsed_info.public_key)
}
pub fn chain_unprovably_stalled(&self) -> bool {
let SignerStatus::Tested { result } = &self.status else {
return false;
};
result.local_chain_status.chain_unprovably_stalled()
}
}
impl<LS, TS, LC, TC> SignerResult<LS, TS, LC, TC>
where
LS: LegacySignerResponse,
TS: SignerResponse,
{
pub fn unknown_signing_status(&self) -> bool {
let Ok(_) = self.information.parse() else {
return true;
};
if let SignerStatus::Tested { .. } = &self.status {
return false;
}
true
}
pub fn signing_available(&self) -> bool {
let Ok(parsed_info) = self.information.parse() else {
return false;
};
let SignerStatus::Tested { result } = &self.status else {
return false;
};
result.signing_status.signing_available(
parsed_info.public_key,
self.dkg_epoch_id,
parsed_info.verification_key_share,
parsed_info.share_verified,
)
}
pub fn signing_provably_unavailable(&self) -> bool {
let Ok(parsed_info) = self.information.parse() else {
return false;
};
let SignerStatus::Tested { result } = &self.status else {
return false;
};
result
.signing_status
.signing_provably_unavailable(parsed_info.public_key, self.dkg_epoch_id)
}
pub fn signing_unprovably_unavailable(&self) -> bool {
let Ok(parsed_info) = self.information.parse() else {
return false;
};
let SignerStatus::Tested { result } = &self.status else {
return false;
};
result.signing_status.signing_unprovably_unavailable(
parsed_info.public_key,
parsed_info.verification_key_share,
parsed_info.share_verified,
)
}
}
#[derive(Debug, Serialize, Deserialize, Clone, ToSchema)]
pub enum SignerStatus<LS, TS, LC, TC> {
Unreachable,
ProvidedInvalidDetails,
Tested {
result: SignerTestResult<LS, TS, LC, TC>,
},
}
impl<LS, TS, LC, TC> SignerStatus<LS, TS, LC, TC> {
pub fn with_details(
self,
information: impl Into<RawDealerInformation>,
dkg_epoch_id: u64,
) -> SignerResult<LS, TS, LC, TC> {
SignerResult {
dkg_epoch_id,
status: self,
information: information.into(),
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone, ToSchema)]
pub struct SignerTestResult<LS, TS, LC, TC> {
pub reported_version: String,
pub signing_status: Status<LS, TS>,
pub local_chain_status: Status<LC, TC>,
}
+27
View File
@@ -0,0 +1,27 @@
[package]
name = "nym-ecash-signer-check"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
rust-version.workspace = true
readme.workspace = true
[dependencies]
futures = { workspace = true }
thiserror = { workspace = true }
semver = { workspace = true }
tokio = { workspace = true, features = ["time"] }
tracing = { workspace = true }
url = { workspace = true }
nym-validator-client = { path = "../client-libs/validator-client" }
nym-network-defaults = { path = "../network-defaults" }
nym-ecash-signer-check-types = { path = "../ecash-signer-check-types" }
[lints]
workspace = true
@@ -0,0 +1,225 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::{LocalChainStatus, SigningStatus, TypedSignerResult};
use nym_ecash_signer_check_types::dealer_information::RawDealerInformation;
use nym_ecash_signer_check_types::status::{SignerStatus, SignerTestResult};
use nym_validator_client::client::NymApiClientExt;
use nym_validator_client::models::BinaryBuildInformationOwned;
use nym_validator_client::nyxd::contract_traits::dkg_query_client::{
ContractVKShare, DealerDetails,
};
use nym_validator_client::NymApiClient;
use std::time::Duration;
use tracing::{error, warn};
use url::Url;
pub(crate) mod chain_status {
// Dorina
pub(crate) const MINIMUM_VERSION_LEGACY: semver::Version = semver::Version::new(1, 1, 51);
// Gruyere
pub(crate) const MINIMUM_VERSION: semver::Version = semver::Version::new(1, 1, 64);
}
pub(crate) mod signing_status {
// Magura (possibly earlier)
pub(crate) const MINIMUM_LEGACY_VERSION: semver::Version = semver::Version::new(1, 1, 46);
// Gruyere
pub(crate) const MINIMUM_VERSION: semver::Version = semver::Version::new(1, 1, 64);
}
struct ClientUnderTest {
api_client: NymApiClient,
build_info: Option<BinaryBuildInformationOwned>,
}
impl ClientUnderTest {
pub(crate) fn new(api_url: &Url) -> Self {
ClientUnderTest {
api_client: NymApiClient::new(api_url.clone()),
build_info: None,
}
}
pub(crate) async fn try_retrieve_build_information(&mut self) -> bool {
match tokio::time::timeout(
Duration::from_secs(5),
self.api_client.nym_api.build_information(),
)
.await
{
Ok(Ok(build_information)) => {
self.build_info = Some(build_information);
true
}
Ok(Err(err)) => {
warn!("{}: failed to retrieve build information: {err}. the signer is most likely down", self.api_client.api_url());
false
}
Err(_timeout) => {
warn!(
"{}: timed out while attempting to retrieve build information",
self.api_client.api_url()
);
false
}
}
}
pub(crate) fn version(&self) -> Option<semver::Version> {
self.build_info.as_ref().and_then(|build_info| {
build_info
.build_version
.parse()
.inspect_err(|err| {
error!(
"ecash signer '{}' reports invalid version {}: {err}",
self.api_client.api_url(),
build_info.build_version
)
})
.ok()
})
}
pub(crate) fn supports_legacy_signing_status_query(&self) -> bool {
let Some(version) = self.version() else {
return false;
};
version >= signing_status::MINIMUM_LEGACY_VERSION
}
pub(crate) fn supports_signing_status_query(&self) -> bool {
let Some(version) = self.version() else {
return false;
};
version >= signing_status::MINIMUM_VERSION
}
pub(crate) fn supports_chain_status_query(&self) -> bool {
let Some(version) = self.version() else {
return false;
};
version >= chain_status::MINIMUM_VERSION
}
pub(crate) fn supports_legacy_chain_status_query(&self) -> bool {
let Some(version) = self.version() else {
return false;
};
version >= chain_status::MINIMUM_VERSION_LEGACY
}
pub(crate) async fn check_local_chain(&self) -> LocalChainStatus {
// check if it at least supports legacy query
if !self.supports_legacy_chain_status_query() {
return LocalChainStatus::Outdated;
}
// check if it supports the current query
if self.supports_chain_status_query() {
return match self.api_client.nym_api.get_chain_blocks_status().await {
Ok(status) => LocalChainStatus::Reachable {
response: Box::new(status),
},
Err(err) => {
warn!(
"{}: failed to retrieve local chain status: {err}",
self.api_client.api_url()
);
LocalChainStatus::Unreachable
}
};
}
// fallback to the legacy query
match self.api_client.nym_api.get_chain_status().await {
Ok(status) => LocalChainStatus::ReachableLegacy {
response: Box::new(status),
},
Err(err) => {
warn!(
"{}: failed to retrieve [legacy] local chain status: {err}",
self.api_client.api_url()
);
LocalChainStatus::Unreachable
}
}
}
pub(crate) async fn check_signing_status(&self) -> SigningStatus {
// check if it at least supports legacy query
if !self.supports_legacy_signing_status_query() {
return SigningStatus::Outdated;
}
// check if it supports the current query
if self.supports_signing_status_query() {
return match self.api_client.nym_api.get_signer_status().await {
Ok(response) => SigningStatus::Reachable {
response: Box::new(response),
},
Err(err) => {
warn!(
"{}: failed to retrieve signer chain status: {err}",
self.api_client.api_url()
);
SigningStatus::Unreachable
}
};
}
// fallback to the legacy query
match self.api_client.nym_api.get_signer_information().await {
Ok(status) => SigningStatus::ReachableLegacy {
response: Box::new(status),
},
Err(err) => {
warn!(
"{}: failed to retrieve [legacy] signer chain status: {err}",
self.api_client.api_url()
);
// NOTE: this might equally mean the signing is disabled
SigningStatus::Unreachable
}
}
}
}
pub(crate) async fn check_client(
dealer_details: DealerDetails,
dkg_epoch: u64,
contract_share: Option<&ContractVKShare>,
) -> TypedSignerResult {
let dealer_information = RawDealerInformation::new(&dealer_details, contract_share);
// 7. attempt to construct client instances out of them
let Ok(parsed_information) = dealer_information.parse() else {
return SignerStatus::ProvidedInvalidDetails.with_details(dealer_information, dkg_epoch);
};
let mut client = ClientUnderTest::new(&parsed_information.announce_address);
// 8. check basic connection status - can you retrieve build information?
if !client.try_retrieve_build_information().await {
return SignerStatus::Unreachable.with_details(dealer_information, dkg_epoch);
}
// 9. check perceived chain status
let local_chain_status = client.check_local_chain().await;
// 10. check signer status
let signing_status = client.check_signing_status().await;
SignerStatus::Tested {
result: SignerTestResult {
reported_version: client.version().map(|v| v.to_string()).unwrap_or_default(),
signing_status,
local_chain_status,
},
}
.with_details(dealer_information, dkg_epoch)
}
@@ -0,0 +1,80 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::SignerCheckError;
use nym_crypto::asymmetric::ed25519;
use nym_validator_client::nyxd::contract_traits::dkg_query_client::{
ContractVKShare, DealerDetails, VerificationKeyShare,
};
use url::Url;
#[derive(Debug)]
pub struct RawDealerInformation {
pub announce_address: String,
pub owner_address: String,
pub node_index: u64,
pub public_key: String,
pub verification_key_share: Option<VerificationKeyShare>,
pub share_verified: bool,
}
impl RawDealerInformation {
pub fn new(
dealer_details: &DealerDetails,
contract_share: Option<&ContractVKShare>,
) -> RawDealerInformation {
RawDealerInformation {
announce_address: dealer_details.announce_address.clone(),
owner_address: dealer_details.address.to_string(),
node_index: dealer_details.assigned_index,
public_key: dealer_details.ed25519_identity.clone(),
verification_key_share: contract_share.map(|s| s.share.clone()),
share_verified: contract_share.map(|s| s.verified).unwrap_or(false),
}
}
pub fn parse(&self) -> Result<DealerInformation, SignerCheckError> {
Ok(DealerInformation {
announce_address: self.announce_address.parse().map_err(|source| {
SignerCheckError::InvalidDealerAddress {
dealer_url: self.announce_address.clone(),
source,
}
})?,
owner_address: self.owner_address.clone(),
node_index: self.node_index,
public_key: self.announce_address.parse().map_err(|source| {
SignerCheckError::InvalidDealerPubkey {
dealer_url: self.announce_address.clone(),
source,
}
})?,
verification_key_share: self.verification_key_share.clone(),
share_verified: self.share_verified,
})
}
}
#[derive(Debug)]
pub struct DealerInformation {
pub announce_address: Url,
pub owner_address: String,
pub node_index: u64,
pub public_key: ed25519::PublicKey,
// no need to parse it into the full type as it doesn't get us anything
pub verification_key_share: Option<VerificationKeyShare>,
pub share_verified: bool,
}
impl From<DealerInformation> for RawDealerInformation {
fn from(d: DealerInformation) -> Self {
RawDealerInformation {
announce_address: d.announce_address.to_string(),
owner_address: d.owner_address,
node_index: d.node_index,
public_key: d.public_key.to_base58_string(),
verification_key_share: d.verification_key_share,
share_verified: d.share_verified,
}
}
}
+24
View File
@@ -0,0 +1,24 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_validator_client::nyxd::error::NyxdError;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum SignerCheckError {
#[error("failed to connect to nyxd chain due to invalid connection details: {source}")]
InvalidNyxdConnectionDetails { source: NyxdError },
#[error("failed to query the DKG contract: {source}")]
DKGContractQueryFailure { source: NyxdError },
}
impl SignerCheckError {
pub fn invalid_nyxd_connection_details(source: NyxdError) -> Self {
Self::InvalidNyxdConnectionDetails { source }
}
pub fn dkg_contract_query_failure(source: NyxdError) -> Self {
Self::DKGContractQueryFailure { source }
}
}
+94
View File
@@ -0,0 +1,94 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client_check::check_client;
use futures::stream::{FuturesUnordered, StreamExt};
use nym_network_defaults::NymNetworkDetails;
use nym_validator_client::nyxd::contract_traits::{DkgQueryClient, PagedDkgQueryClient};
use nym_validator_client::QueryHttpRpcNyxdClient;
use std::collections::HashMap;
use url::Url;
pub use error::SignerCheckError;
use nym_ecash_signer_check_types::status::{SignerResult, Status};
use nym_validator_client::ecash::models::EcashSignerStatusResponse;
use nym_validator_client::models::{
ChainBlocksStatusResponse, ChainStatusResponse, SignerInformationResponse,
};
mod client_check;
pub mod error;
pub type TypedSignerResult = SignerResult<
SignerInformationResponse,
EcashSignerStatusResponse,
ChainStatusResponse,
ChainBlocksStatusResponse,
>;
pub type LocalChainStatus = Status<ChainStatusResponse, ChainBlocksStatusResponse>;
pub type SigningStatus = Status<SignerInformationResponse, EcashSignerStatusResponse>;
pub struct SignersTestResult {
pub threshold: Option<u64>,
pub results: Vec<TypedSignerResult>,
}
pub async fn check_signers(
rpc_endpoint: Url,
// details such as denoms, prefixes, etc.
network_details: NymNetworkDetails,
) -> Result<SignersTestResult, SignerCheckError> {
// 1. create nyx client instance
let client = QueryHttpRpcNyxdClient::connect_with_network_details(
rpc_endpoint.as_str(),
network_details,
)
.map_err(SignerCheckError::invalid_nyxd_connection_details)?;
check_signers_with_client(&client).await
}
pub async fn check_signers_with_client<C>(client: &C) -> Result<SignersTestResult, SignerCheckError>
where
C: DkgQueryClient + Sync,
{
// 2. retrieve current dkg epoch
let dkg_epoch = client
.get_current_epoch()
.await
.map_err(SignerCheckError::dkg_contract_query_failure)?;
// 3. retrieve the dkg threshold as reference point
let threshold = client
.get_epoch_threshold(dkg_epoch.epoch_id)
.await
.map_err(SignerCheckError::dkg_contract_query_failure)?;
// 4. retrieve information on current DKG dealers (i.e. eligible signers)
let dealers = client
.get_all_current_dealers()
.await
.map_err(SignerCheckError::dkg_contract_query_failure)?;
// 5. retrieve their published keys (if available)
let shares: HashMap<_, _> = client
.get_all_verification_key_shares(dkg_epoch.epoch_id)
.await
.map_err(SignerCheckError::dkg_contract_query_failure)?
.into_iter()
.map(|share| (share.node_index, share))
.collect();
// 6. for each dealer attempt to perform the checks
let results = dealers
.into_iter()
.map(|d| {
let share = shares.get(&d.assigned_index);
check_client(d, dkg_epoch.epoch_id, share)
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<_>>()
.await;
Ok(SignersTestResult { threshold, results })
}
+73
View File
@@ -0,0 +1,73 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::chain_status::LocalChainStatus;
use crate::dealer_information::RawDealerInformation;
use crate::signing_status::SigningStatus;
use std::time::Duration;
pub(crate) const STALE_RESPONSE_THRESHOLD: Duration = Duration::from_secs(5 * 60);
#[derive(Debug)]
pub struct SignerResult {
pub dkg_epoch_id: u64,
pub information: RawDealerInformation,
pub status: SignerStatus,
}
impl SignerResult {
pub fn chain_available(&self) -> bool {
let Ok(parsed_info) = self.information.parse() else {
return false;
};
let SignerStatus::Tested { result } = &self.status else {
return false;
};
result.local_chain_status.available(parsed_info.public_key)
}
pub fn signer_available(&self) -> bool {
let Ok(parsed_info) = self.information.parse() else {
return false;
};
let SignerStatus::Tested { result } = &self.status else {
return false;
};
result.signing_status.available(
parsed_info.public_key,
self.dkg_epoch_id,
parsed_info.verification_key_share,
parsed_info.share_verified,
)
}
}
#[derive(Debug)]
pub enum SignerStatus {
Unreachable,
ProvidedInvalidDetails,
Tested { result: SignerTestResult },
}
impl SignerStatus {
pub fn with_details(
self,
information: impl Into<RawDealerInformation>,
dkg_epoch_id: u64,
) -> SignerResult {
SignerResult {
dkg_epoch_id,
status: self,
information: information.into(),
}
}
}
#[derive(Debug)]
pub struct SignerTestResult {
pub reported_version: String,
pub signing_status: SigningStatus,
pub local_chain_status: LocalChainStatus,
}
+3
View File
@@ -47,4 +47,7 @@ workspace = true
default-features = false
[dev-dependencies]
anyhow = { workspace = true }
nym-compact-ecash = { path = "../nym_offline_compact_ecash" } # we need specific imports in tests
nym-test-utils = { path = "../test-utils" }
tokio = { workspace = true, features = ["full"] }
@@ -109,3 +109,85 @@ GATEWAY -> CLIENT
DONE(status)
*/
#[cfg(test)]
mod tests {
use super::*;
use crate::ClientControlRequest;
use futures::StreamExt;
use nym_test_utils::helpers::u64_seeded_rng;
use nym_test_utils::mocks::stream_sink::mock_streams;
use nym_test_utils::traits::{Leak, Timeboxed, TimeboxedSpawnable};
use tokio::join;
use tungstenite::Message;
#[tokio::test]
async fn basic_handshake() -> anyhow::Result<()> {
use anyhow::Context as _;
// solve the lifetime issue by just leaking the contents of the boxes
// which is perfectly fine in test
let client_rng = u64_seeded_rng(42).leak();
let gateway_rng = u64_seeded_rng(69).leak();
let client_keys = ed25519::KeyPair::new(client_rng).leak();
let gateway_keys = ed25519::KeyPair::new(gateway_rng).leak();
let (client_ws, gateway_ws) = mock_streams::<Message>();
// we need streams that return Result<Message, WsError>
let client_ws = client_ws.map(Ok);
let gateway_ws = gateway_ws.map(Ok);
let client_ws = client_ws.leak();
let gateway_ws = gateway_ws.leak();
let handshake_client = client_handshake(
client_rng,
client_ws,
client_keys,
*gateway_keys.public_key(),
false,
true,
TaskClient::dummy(),
);
let client_fut = handshake_client.spawn_timeboxed();
// we need to receive the first message so that it could be propagated to the gateway side of the handshake
let ClientControlRequest::RegisterHandshakeInitRequest {
protocol_version: _,
data,
} = (gateway_ws.next())
.timeboxed()
.await
.context("timeout")?
.context("no message!")??
.into_text()?
.parse::<ClientControlRequest>()?
else {
panic!("bad message")
};
let init_msg = data;
let handshake_gateway = gateway_handshake(
gateway_rng,
gateway_ws,
gateway_keys,
init_msg,
TaskClient::dummy(),
);
let gateway_fut = handshake_gateway.spawn_timeboxed();
let (client, gateway) = join!(client_fut, gateway_fut);
let client_key = client???;
let gateway_key = gateway???;
// ensure the created keys are the same
assert_eq!(client_key, gateway_key);
Ok(())
}
}
+57 -28
View File
@@ -6,9 +6,9 @@
//! The resolver itself is the set combination of the google, cloudflare, and quad9 endpoints
//! supporting DoH and DoT.
//!
//! This resolver implements a fallback mechanism where, should the DNS-over-TLS resolution fail, a
//! This resolver supports a fallback mechanism where, should the DNS-over-TLS resolution fail, a
//! followup resolution will be done using the hosts configured default (e.g. `/etc/resolve.conf` on
//! linux).
//! linux). This is disabled by default and can be enabled using [`enable_system_fallback`].
//!
//! Requires the `dns-over-https-rustls`, `webpki-roots` feature for the
//! `hickory-resolver` crate
@@ -93,14 +93,14 @@ pub struct HickoryDnsResolver {
// Tokio Runtime in initialization, so we must delay the actual
// construction of the resolver.
state: Arc<OnceCell<TokioResolver>>,
fallback: Arc<OnceCell<TokioResolver>>,
fallback: Option<Arc<OnceCell<TokioResolver>>>,
dont_use_shared: bool,
}
impl Resolve for HickoryDnsResolver {
fn resolve(&self, name: Name) -> Resolving {
let resolver = self.state.clone();
let fallback = self.fallback.clone();
let maybe_fallback = self.fallback.clone();
let independent = self.dont_use_shared;
Box::pin(async move {
let resolver = resolver.get_or_try_init(|| {
@@ -117,23 +117,30 @@ impl Resolve for HickoryDnsResolver {
let lookup = match resolver.lookup_ip(name.as_str()).await {
Ok(res) => res,
Err(e) => {
// on failure use the fall back system configured DNS resolver
if !e.is_no_records_found() {
warn!("primary DNS failed w/ error {e}: using system fallback");
}
let resolver = fallback.get_or_try_init(|| {
// using a closure here is slightly gross, but this makes sure that if the
// lazy-init returns an error it can be handled by the client
if independent {
new_resolver_system()
} else {
Ok(SHARED_RESOLVER
.fallback
.get_or_try_init(new_resolver_system)?
.clone())
if let Some(ref fallback) = maybe_fallback {
// on failure use the fall back system configured DNS resolver
if !e.is_no_records_found() {
warn!("primary DNS failed w/ error {e}: using system fallback");
}
})?;
resolver.lookup_ip(name.as_str()).await?
let resolver = fallback.get_or_try_init(|| {
// using a closure here is slightly gross, but this makes sure that if the
// lazy-init returns an error it can be handled by the client
if independent {
new_resolver_system()
} else {
Ok(SHARED_RESOLVER
.fallback
.as_ref()
.ok_or(e)? // if the shared resolver has no fallback return the original error
.get_or_try_init(new_resolver_system)?
.clone())
}
})?;
resolver.lookup_ip(name.as_str()).await?
} else {
return Err(e.into());
}
}
};
@@ -162,14 +169,17 @@ impl HickoryDnsResolver {
let lookup = match resolver.lookup_ip(name).await {
Ok(res) => res,
Err(e) => {
// on failure use the fall back system configured DNS resolver
if !e.is_no_records_found() {
warn!("primary DNS failed w/ error {e}: using system fallback");
if let Some(ref fallback) = self.fallback {
// on failure use the fall back system configured DNS resolver
if !e.is_no_records_found() {
warn!("primary DNS failed w/ error {e}: using system fallback");
}
let resolver = fallback.get_or_try_init(|| self.new_resolver_system())?;
resolver.lookup_ip(name).await?
} else {
return Err(e.into());
}
let resolver = self
.fallback
.get_or_try_init(|| self.new_resolver_system())?;
resolver.lookup_ip(name).await?
}
};
@@ -193,15 +203,34 @@ impl HickoryDnsResolver {
}
fn new_resolver_system(&self) -> Result<TokioResolver, HickoryDnsError> {
if self.dont_use_shared {
if self.dont_use_shared || SHARED_RESOLVER.fallback.is_none() {
new_resolver_system()
} else {
Ok(SHARED_RESOLVER
.fallback
.as_ref()
.unwrap()
.get_or_try_init(new_resolver_system)?
.clone())
}
}
/// Enable fallback to the system default resolver if the primary (DoX) resolver fails
pub fn enable_system_fallback(&mut self) -> Result<(), HickoryDnsError> {
self.fallback = Some(Default::default());
let _ = self
.fallback
.as_ref()
.unwrap()
.get_or_try_init(new_resolver_system)?;
Ok(())
}
/// Disable fallback resolution. If the primary resolver fails the error is
/// returned immediately
pub fn disable_system_fallback(&mut self) {
self.fallback = None;
}
}
/// Create a new resolver with a custom DoT based configuration. The options are overridden to look
+5
View File
@@ -173,6 +173,10 @@ mod path;
#[cfg(not(target_arch = "wasm32"))]
pub use dns::{HickoryDnsError, HickoryDnsResolver};
// helper for generating user agent based on binary information
#[doc(hidden)]
pub use nym_bin_common::bin_info;
/// Default HTTP request connection timeout.
///
/// The timeout is relatively high as we are often making requests over the mixnet, where latency is
@@ -608,6 +612,7 @@ impl Client {
current_idx: Arc::new(Default::default()),
reqwest_client: self.reqwest_client.clone(),
#[cfg(feature = "tunneling")]
front: self.front.clone(),
retry_limit: self.retry_limit,
+11 -1
View File
@@ -16,10 +16,20 @@ pub struct UserAgent {
pub version: String,
/// client platform
pub platform: String,
/// source commit version for the calling calling crate / subsystem
/// source commit version for the calling crate / subsystem
pub git_commit: String,
}
/// Create `UserAgent` based on the caller's crate information
// we can't use normal function as then `application` and `version` would correspond
// of that of `nym-http-api-client` lib
#[macro_export]
macro_rules! generate_user_agent {
() => {
$crate::UserAgent::from($crate::bin_info!())
};
}
#[derive(Clone, Debug, thiserror::Error)]
#[error("invalid user agent string: {0}")]
pub struct UserAgentError(String);
@@ -146,6 +146,12 @@ pub struct OutputParams {
pub output: Option<Output>,
}
impl OutputParams {
pub fn get_output(&self) -> Output {
self.output.unwrap_or_default()
}
}
impl Output {
pub fn to_response<T: Serialize>(self, data: T) -> FormattedResponse<T> {
match self {
+2 -2
View File
@@ -25,8 +25,8 @@ pub enum NymIdError {
#[error("attempted to import an expired credential (it expired on {expiration})")]
ExpiredCredentialImport { expiration: Date },
#[error("could not import ticketbook expiring at {date} since we do not have corresponding expiration date signatures")]
MissingExpirationDateSignatures { date: Date },
#[error("could not import ticketbook expiring at {date} for epoch {epoch_id} since we do not have corresponding expiration date signatures")]
MissingExpirationDateSignatures { date: Date, epoch_id: u64 },
#[error("could not import ticketbook for epoch {epoch_id} since we do not have corresponding coin index signatures")]
MissingCoinIndexSignatures { epoch_id: u64 },
@@ -99,7 +99,7 @@ where
// in order to import the ticketbook we MUST have the appropriate signatures in the storage already
if credentials_store
.get_expiration_date_signatures(ticketbook.expiration_date())
.get_expiration_date_signatures(ticketbook.expiration_date(), ticketbook.epoch_id())
.await
.map_err(|source| NymIdError::StorageError {
source: Box::new(source),
@@ -108,6 +108,7 @@ where
{
return Err(NymIdError::MissingExpirationDateSignatures {
date: ticketbook.expiration_date(),
epoch_id: ticketbook.epoch_id(),
});
}
+2
View File
@@ -14,6 +14,7 @@ pin-project = { workspace = true }
sha2 = { workspace = true }
snow = { workspace = true }
strum = { workspace = true, features = ["derive"] }
strum_macros = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["net", "io-util", "time"] }
tokio-util = { workspace = true, features = ["codec"] }
@@ -27,6 +28,7 @@ anyhow = { workspace = true }
tokio = { workspace = true, features = ["full"] }
rand_chacha = { workspace = true }
nym-crypto = { path = "../crypto", features = ["rand"] }
nym-test-utils = { path = "../test-utils" }
[lints]
+1 -1
View File
@@ -13,7 +13,7 @@ use nym_crypto::asymmetric::x25519;
use nym_noise_keys::{NoiseVersion, VersionedNoiseKey};
use snow::params::NoiseParams;
use strum::{EnumIter, FromRepr};
use strum_macros::{EnumIter, FromRepr};
#[derive(Default, Debug, Clone, Copy, EnumIter, FromRepr, Eq, PartialEq)]
#[repr(u8)]
+1 -1
View File
@@ -5,7 +5,7 @@ use crate::config::NoisePattern;
use crate::error::NoiseError;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use nym_noise_keys::NoiseVersion;
use strum::FromRepr;
use strum_macros::FromRepr;
#[derive(Debug)]
pub struct NymNoiseFrame {
+9 -117
View File
@@ -411,122 +411,21 @@ where
mod tests {
use super::*;
use nym_crypto::asymmetric::x25519;
use rand_chacha::rand_core::SeedableRng;
use std::io::Error;
use std::mem;
use nym_test_utils::helpers::deterministic_rng;
use nym_test_utils::mocks::async_read_write::mock_io_streams;
use nym_test_utils::traits::{Timeboxed, TimeboxedSpawnable};
use std::sync::Arc;
use std::task::{Context, Waker};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::join;
use tokio::sync::Mutex;
use tokio::time::timeout;
fn mock_streams() -> (MockStream, MockStream) {
let ch1 = Arc::new(Mutex::new(Default::default()));
let ch2 = Arc::new(Mutex::new(Default::default()));
(
MockStream {
inner: MockStreamInner {
tx: ch1.clone(),
rx: ch2.clone(),
},
},
MockStream {
inner: MockStreamInner { tx: ch2, rx: ch1 },
},
)
}
struct MockStream {
inner: MockStreamInner,
}
#[allow(dead_code)]
impl MockStream {
fn unchecked_tx_data(&self) -> Vec<u8> {
self.inner.tx.try_lock().unwrap().data.clone()
}
fn unchecked_rx_data(&self) -> Vec<u8> {
self.inner.rx.try_lock().unwrap().data.clone()
}
}
struct MockStreamInner {
tx: Arc<Mutex<DataWrapper>>,
rx: Arc<Mutex<DataWrapper>>,
}
#[derive(Default)]
struct DataWrapper {
data: Vec<u8>,
waker: Option<Waker>,
}
impl AsyncRead for MockStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let mut inner = self.inner.rx.try_lock().unwrap();
let data = mem::take(&mut inner.data);
if data.is_empty() {
inner.waker = Some(cx.waker().clone());
return Poll::Pending;
}
if let Some(waker) = inner.waker.take() {
waker.wake();
}
buf.put_slice(&data);
Poll::Ready(Ok(()))
}
}
impl AsyncWrite for MockStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, Error>> {
let mut inner = self.inner.tx.try_lock().unwrap();
let len = buf.len();
if !inner.data.is_empty() {
assert!(inner.waker.is_none());
inner.waker = Some(cx.waker().clone());
return Poll::Pending;
}
inner.data.extend_from_slice(buf);
if let Some(waker) = inner.waker.take() {
waker.wake();
}
Poll::Ready(Ok(len))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
Poll::Ready(Ok(()))
}
}
#[tokio::test]
async fn noise_handshake() -> anyhow::Result<()> {
let dummy_seed = [42u8; 32];
let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed);
let mut rng = deterministic_rng();
let initiator_keys = Arc::new(x25519::KeyPair::new(&mut rng));
let responder_keys = Arc::new(x25519::KeyPair::new(&mut rng));
let (initiator_stream, responder_stream) = mock_streams();
let (initiator_stream, responder_stream) = mock_io_streams();
let psk = generate_psk(*responder_keys.public_key(), NoiseVersion::V1)?;
let pattern = NoisePattern::default();
@@ -547,14 +446,8 @@ mod tests {
*responder_keys.public_key(),
);
let initiator_fut =
tokio::spawn(
async move { timeout(Duration::from_millis(200), stream_initiator).await },
);
let responder_fut =
tokio::spawn(
async move { timeout(Duration::from_millis(200), stream_responder).await },
);
let initiator_fut = stream_initiator.spawn_timeboxed();
let responder_fut = stream_responder.spawn_timeboxed();
let (initiator, responder) = join!(initiator_fut, responder_fut);
@@ -563,14 +456,13 @@ mod tests {
let msg = b"hello there";
// if noise was successful we should be able to write a proper message across
timeout(Duration::from_millis(200), initiator.write_all(msg)).await??;
initiator.write_all(msg).timeboxed().await??;
initiator.inner_stream.flush().await?;
let inner_buf = initiator.inner_stream.get_ref().unchecked_tx_data();
let mut buf = [0u8; 11];
timeout(Duration::from_millis(200), responder.read(&mut buf)).await??;
responder.read(&mut buf).timeboxed().await??;
assert_eq!(&buf[..], msg);
+1 -1
View File
@@ -233,7 +233,7 @@ where
let dkg_query_client = if self.config.base.client.disabled_credentials_mode {
None
} else {
Some(default_query_dkg_client_from_config(&self.config.base))
Some(default_query_dkg_client_from_config(&self.config.base)?)
};
let mut base_builder =
+1
View File
@@ -21,6 +21,7 @@ time = { workspace = true }
tokio = { workspace = true }
si-scale = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
nym-crypto = { path = "../crypto" }
nym-sphinx = { path = "../nymsphinx" }
+2 -2
View File
@@ -7,8 +7,8 @@ use serde::{Deserialize, Serialize};
PartialEq,
Copy,
Clone,
strum::Display,
strum::EnumString,
strum_macros::Display,
strum_macros::EnumString,
Serialize,
Deserialize,
Default,
+5
View File
@@ -265,6 +265,7 @@ impl ShutdownManager {
}
#[must_use]
#[track_caller]
pub fn with_shutdown<F>(mut self, shutdown: F) -> Self
where
F: Future<Output = ()>,
@@ -281,6 +282,7 @@ impl ShutdownManager {
}
#[cfg(unix)]
#[track_caller]
pub fn with_shutdown_signal(self, signal_kind: SignalKind) -> std::io::Result<Self> {
let mut sig = signal(signal_kind)?;
Ok(self.with_shutdown(async move {
@@ -289,6 +291,7 @@ impl ShutdownManager {
}
#[cfg(not(target_arch = "wasm32"))]
#[track_caller]
pub fn with_interrupt_signal(self) -> Self {
self.with_shutdown(async move {
let _ = tokio::signal::ctrl_c().await;
@@ -296,11 +299,13 @@ impl ShutdownManager {
}
#[cfg(unix)]
#[track_caller]
pub fn with_terminate_signal(self) -> std::io::Result<Self> {
self.with_shutdown_signal(SignalKind::terminate())
}
#[cfg(unix)]
#[track_caller]
pub fn with_quit_signal(self) -> std::io::Result<Self> {
self.with_shutdown_signal(SignalKind::quit())
}
+2
View File
@@ -10,6 +10,7 @@ where
}
#[cfg(not(target_arch = "wasm32"))]
#[track_caller]
pub fn spawn<F>(future: F)
where
F: Future + Send + 'static,
@@ -18,6 +19,7 @@ where
tokio::spawn(future);
}
#[track_caller]
pub fn spawn_with_report_error<F, T, E>(future: F, mut shutdown: TaskClient)
where
F: Future<Output = Result<T, E>> + Send + 'static,
+23
View File
@@ -0,0 +1,23 @@
[package]
name = "nym-test-utils"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
rust-version.workspace = true
readme.workspace = true
[dependencies]
anyhow = { workspace = true }
futures = { workspace = true }
rand_chacha = { workspace = true }
tokio = { workspace = true, features = ["sync", "time", "rt"] }
[dev-dependencies]
tokio = { workspace = true, features = ["full"] }
[lints]
workspace = true
+33
View File
@@ -0,0 +1,33 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::traits::Timeboxed;
use rand_chacha::rand_core::SeedableRng;
use rand_chacha::ChaCha20Rng;
use std::future::Future;
use tokio::task::JoinHandle;
use tokio::time::error::Elapsed;
pub fn leak<T>(val: T) -> &'static mut T {
Box::leak(Box::new(val))
}
pub fn spawn_timeboxed<F>(fut: F) -> JoinHandle<Result<F::Output, Elapsed>>
where
F: Future + Send + 'static,
<F as Future>::Output: Send,
{
tokio::spawn(async move { fut.timeboxed().await })
}
pub fn deterministic_rng() -> ChaCha20Rng {
seeded_rng([42u8; 32])
}
pub fn seeded_rng(seed: [u8; 32]) -> ChaCha20Rng {
ChaCha20Rng::from_seed(seed)
}
pub fn u64_seeded_rng(seed: u64) -> ChaCha20Rng {
ChaCha20Rng::seed_from_u64(seed)
}
+6
View File
@@ -0,0 +1,6 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod helpers;
pub mod mocks;
pub mod traits;
@@ -0,0 +1,161 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::mocks::shared::InnerWrapper;
use futures::ready;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
// sending buffer of the first stream is the receiving buffer of the second stream
// and vice versa
pub fn mock_io_streams() -> (MockIOStream, MockIOStream) {
let ch1 = MockIOStream::default();
let ch2 = ch1.make_connection();
(ch1, ch2)
}
#[derive(Default)]
pub struct MockIOStream {
// messages to send
tx: InnerWrapper<Vec<u8>>,
// messages to receive
rx: InnerWrapper<Vec<u8>>,
}
impl MockIOStream {
fn make_connection(&self) -> Self {
MockIOStream {
tx: self.rx.cloned_buffer(),
rx: self.tx.cloned_buffer(),
}
}
// unwrap in test code is fine
#[allow(clippy::unwrap_used)]
pub fn unchecked_tx_data(&self) -> Vec<u8> {
self.tx.buffer.try_lock().unwrap().content.clone()
}
// unwrap in test code is fine
#[allow(clippy::unwrap_used)]
pub fn unchecked_rx_data(&self) -> Vec<u8> {
self.rx.buffer.try_lock().unwrap().content.clone()
}
}
impl AsyncRead for MockIOStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
ready!(Pin::new(&mut self.rx).poll_guard_ready(cx));
// SAFETY: guard is ready
#[allow(clippy::unwrap_used)]
let guard = self.rx.guard().unwrap();
let data = guard.take_content();
if data.is_empty() {
// nothing to retrieve - store the waiter so that the sender could trigger it
guard.waker = Some(cx.waker().clone());
// drop the guard so that the sender could actually put messages in
self.rx.transition_to_idle();
return Poll::Pending;
}
// if let Some(waker) = guard.waker.take() {
// waker.wake();
// }
self.rx.transition_to_idle();
buf.put_slice(&data);
Poll::Ready(Ok(()))
}
}
impl AsyncWrite for MockIOStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
// wait until we transition to the locked state
ready!(Pin::new(&mut self.tx).poll_guard_ready(cx));
// SAFETY: guard is ready
#[allow(clippy::unwrap_used)]
let guard = self.tx.guard().unwrap();
let len = buf.len();
guard.content.extend_from_slice(buf);
// TODO: if we wanted the behaviour of always reading everything before writing anything extra
// if !guard.content.is_empty() {
// // sanity check
// assert!(guard.waker.is_none());
// guard.waker = Some(cx.waker().clone());
// self.tx.transition_to_idle();
// return Poll::Pending;
// }
Poll::Ready(Ok(len))
}
fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let Some(guard) = self.tx.guard() else {
return Poll::Ready(Err(io::Error::other(
"invalid lock state to send/flush messages",
)));
};
if let Some(waker) = guard.waker.take() {
// notify the receiver if it was waiting for messages
waker.wake();
}
// release the guard
self.tx.transition_to_idle();
Poll::Ready(Ok(()))
}
fn poll_shutdown(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
// make sure our guard is always dropped on close
self.tx.transition_to_idle();
Poll::Ready(Ok(()))
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::test]
async fn basic() {
let (mut stream1, mut stream2) = mock_io_streams();
stream1.write_all(&[1, 2, 3, 4, 5]).await.unwrap();
stream1.flush().await.unwrap();
let mut buf = [0u8; 5];
let read = stream2.read(&mut buf).await.unwrap();
assert_eq!(read, 5);
assert_eq!(&buf[0..5], &[1, 2, 3, 4, 5]);
let mut buf = [0u8; 5];
stream2.write_all(&[6, 7, 8, 9, 10]).await.unwrap();
stream2.flush().await.unwrap();
let read = stream1.read(&mut buf).await.unwrap();
assert_eq!(read, 5);
assert_eq!(&buf[0..5], &[6, 7, 8, 9, 10]);
}
}
+6
View File
@@ -0,0 +1,6 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod async_read_write;
mod shared;
pub mod stream_sink;
+109
View File
@@ -0,0 +1,109 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use futures::future::BoxFuture;
use futures::{ready, FutureExt};
use std::mem;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
use tokio::sync::{Mutex, OwnedMutexGuard};
#[derive(Default)]
pub(crate) struct InnerWrapper<T: 'static> {
pub(crate) buffer: Arc<Mutex<ContentWrapper<T>>>,
lock_state: LockState<T>,
}
impl<T: Send> InnerWrapper<T> {
pub(crate) fn clone_buffer(&self) -> Arc<Mutex<ContentWrapper<T>>> {
Arc::clone(&self.buffer)
}
pub(crate) fn cloned_buffer(&self) -> Self {
assert!(matches!(self.lock_state, LockState::Idle));
InnerWrapper {
buffer: self.clone_buffer(),
lock_state: LockState::Idle,
}
}
// NOTE: it's responsibility of the caller to ensure the guard is released and state transitions to idle!
pub(crate) fn poll_guard_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
match &mut self.lock_state {
LockState::Idle => {
// 1. first try to obtain the guard without locking
let Ok(guard) = self.buffer.clone().try_lock_owned() else {
// 2. if that fails, create the future for obtaining it
self.lock_state =
LockState::TryingToLock(self.buffer.clone().lock_owned().boxed());
return Poll::Pending;
};
// correctly transition to locked state and poll ourselves again
self.lock_state = LockState::Locked(guard);
cx.waker().wake_by_ref();
Poll::Ready(())
}
LockState::TryingToLock(lock_fut) => {
// see if the guard future has resolved, if so, transition to locked state and schedule for another poll
let guard = ready!(lock_fut.as_mut().poll(cx));
self.lock_state = LockState::Locked(guard);
cx.waker().wake_by_ref();
Poll::Pending
}
LockState::Locked(_) => Poll::Ready(()),
}
}
pub(crate) fn guard(&mut self) -> Option<&mut OwnedMutexGuard<ContentWrapper<T>>> {
match &mut self.lock_state {
LockState::Locked(guard) => Some(guard),
_ => None,
}
}
pub(crate) fn transition_to_idle(&mut self) {
self.lock_state = LockState::Idle
}
}
#[derive(Default)]
pub(crate) enum LockState<T> {
// We havent started locking yet
#[default]
Idle,
// Waiting for the mutex lock future to resolve
TryingToLock(BoxFuture<'static, OwnedMutexGuard<ContentWrapper<T>>>),
// We hold the mutex guard
Locked(OwnedMutexGuard<ContentWrapper<T>>),
}
#[derive(Default)]
pub struct ContentWrapper<T> {
pub(crate) content: T,
pub(crate) waker: Option<Waker>,
}
impl<T> ContentWrapper<T> {
pub fn into_content(self) -> T {
self.content
}
pub fn content(&self) -> &T {
&self.content
}
pub(crate) fn take_content(&mut self) -> T
where
T: Default,
{
mem::take(&mut self.content)
}
}
impl<T> LockState<T> {}
+181
View File
@@ -0,0 +1,181 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::mocks::shared::{ContentWrapper, InnerWrapper};
use anyhow::{anyhow, bail};
use futures::{ready, Sink, Stream};
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::sync::Mutex;
// sending buffer of the first stream is the receiving buffer of the second stream
// and vice versa
pub fn mock_streams<T>() -> (MockStream<T>, MockStream<T>)
where
T: Send,
{
let ch1 = MockStream::default();
let ch2 = ch1.make_connection();
(ch1, ch2)
}
pub struct MockStream<T: 'static> {
// messages to send
tx: InnerWrapper<VecDeque<T>>,
// messages to receive
rx: InnerWrapper<VecDeque<T>>,
}
impl<T> MockStream<T> {
pub fn clone_tx_buffer(&self) -> Arc<Mutex<ContentWrapper<VecDeque<T>>>>
where
T: Send,
{
self.tx.clone_buffer()
}
pub fn clone_rx_buffer(&self) -> Arc<Mutex<ContentWrapper<VecDeque<T>>>>
where
T: Send,
{
self.rx.clone_buffer()
}
fn make_connection(&self) -> Self
where
T: Send,
{
MockStream {
tx: self.rx.cloned_buffer(),
rx: self.tx.cloned_buffer(),
}
}
}
impl<T> Default for MockStream<T> {
fn default() -> Self {
MockStream {
tx: InnerWrapper::default(),
rx: InnerWrapper::default(),
}
}
}
impl<T> Stream for MockStream<T>
where
T: Send,
{
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
ready!(Pin::new(&mut self.rx).poll_guard_ready(cx));
// SAFETY: guard is ready
#[allow(clippy::unwrap_used)]
let guard = self.rx.guard().unwrap();
let Some(next) = guard.content.pop_front() else {
// nothing to retrieve - store the waiter so that the sender could trigger it
guard.waker = Some(cx.waker().clone());
// drop the guard so that the sender could actually put messages in
self.rx.transition_to_idle();
return Poll::Pending;
};
// there are more messages buffered waiting for us to retrieve
// keep the guard!
if !guard.content.is_empty() {
cx.waker().wake_by_ref();
} else {
// no more messages, drop the guard
self.rx.transition_to_idle();
}
Poll::Ready(Some(next))
}
fn size_hint(&self) -> (usize, Option<usize>) {
// that's just a minor optimisation, so don't sweat about it too much,
// if we can obtain the mutex, give precise information, otherwise return default values
let Ok(guard) = self.rx.buffer.try_lock() else {
return (0, None);
};
let items = guard.content.len();
(items, Some(items))
}
}
impl<T> Sink<T> for MockStream<T>
where
T: Send,
{
type Error = anyhow::Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// wait until we transition to the locked state
ready!(Pin::new(&mut self.tx).poll_guard_ready(cx));
Poll::Ready(Ok(()))
}
fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
let Some(guard) = self.tx.guard() else {
bail!("invalid lock state to send messages");
};
guard.content.push_back(item);
Ok(())
}
fn poll_flush(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
let Some(guard) = self.tx.guard() else {
return Poll::Ready(Err(anyhow!("invalid lock state to send/flush messages")));
};
if let Some(waker) = guard.waker.take() {
// notify the receiver if it was waiting for messages
waker.wake();
}
// release the guard
self.tx.transition_to_idle();
Poll::Ready(Ok(()))
}
fn poll_close(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
// make sure our guard is always dropped on close
self.tx.transition_to_idle();
Poll::Ready(Ok(()))
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::{SinkExt, StreamExt};
#[tokio::test]
async fn basic() {
let (mut stream1, mut stream2) = mock_streams();
stream1.send("foomp").await.unwrap();
let received = stream2.next().await.unwrap();
assert_eq!(received, "foomp");
stream2.send("bar").await.unwrap();
let received = stream1.next().await.unwrap();
assert_eq!(received, "bar");
}
}
+57
View File
@@ -0,0 +1,57 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::helpers::{leak, spawn_timeboxed};
use std::future::{Future, IntoFuture};
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::error::Elapsed;
// a helper trait for use in tests to easily convert `T` into `&'static mut T`
pub trait Leak {
fn leak(self) -> &'static mut Self;
}
impl<T> Leak for T {
fn leak(self) -> &'static mut T {
leak(self)
}
}
// those are internal testing traits so we're not concerned about auto traits
#[allow(async_fn_in_trait)]
pub trait Timeboxed: IntoFuture + Sized {
async fn timeboxed(self) -> Result<Self::Output, Elapsed> {
self.execute_with_deadline(Duration::from_millis(200)).await
}
async fn execute_with_deadline(self, timeout: Duration) -> Result<Self::Output, Elapsed> {
tokio::time::timeout(timeout, self).await
}
}
impl<T> Timeboxed for T where T: IntoFuture + Sized {}
// those are internal testing traits so we're not concerned about auto traits
#[allow(async_fn_in_trait)]
pub trait Spawnable: Future + Sized + Send + 'static {
fn spawn(self) -> JoinHandle<Self::Output>
where
<Self as Future>::Output: Send + 'static,
{
tokio::spawn(self)
}
}
impl<T> Spawnable for T where T: Future + Sized + Send + 'static {}
pub trait TimeboxedSpawnable: Timeboxed + Spawnable {
fn spawn_timeboxed(self) -> JoinHandle<Result<<Self as Future>::Output, Elapsed>>
where
<Self as Future>::Output: Send,
{
spawn_timeboxed(self)
}
}
impl<T> TimeboxedSpawnable for T where T: Spawnable + Future + Send {}
+1
View File
@@ -19,6 +19,7 @@ serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sha2 = { workspace = true }
strum = { workspace = true, features = ["derive"] }
strum_macros = { workspace = true }
thiserror = { workspace = true }
ts-rs = { workspace = true }
url = { workspace = true }
+1 -1
View File
@@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use strum::{Display, EnumString, VariantNames};
use strum_macros::{Display, EnumString, VariantNames};
#[cfg_attr(feature = "generate-ts", derive(ts_rs::TS))]
#[cfg_attr(
+2 -1
View File
@@ -32,9 +32,11 @@ time = { workspace = true }
tracing = { workspace = true }
nym-authenticator-requests = { path = "../authenticator-requests" }
nym-credentials-interface = { path = "../credentials-interface" }
nym-credential-verification = { path = "../credential-verification" }
nym-crypto = { path = "../crypto", features = ["asymmetric"] }
nym-gateway-storage = { path = "../gateway-storage" }
nym-gateway-requests = { path = "../gateway-requests" }
nym-network-defaults = { path = "../network-defaults" }
nym-task = { path = "../task" }
nym-wireguard-types = { path = "../wireguard-types" }
@@ -46,4 +48,3 @@ nym-gateway-storage = { path = "../gateway-storage", features = ["mock"] }
[features]
default = []
mock = ["nym-gateway-storage/mock"]

Some files were not shown because too many files have changed in this diff Show More