Compare commits

...

33 Commits

Author SHA1 Message Date
Jędrzej Stuczyński dce0d161ea replace logger 2024-10-15 17:11:21 +01:00
Jędrzej Stuczyński 853a62bc5b added debug symbols to release builds 2024-10-14 18:53:09 +01:00
Jędrzej Stuczyński 1927614803 added temporary and experimental memory profiling 2024-10-11 14:34:28 +01:00
Jędrzej Stuczyński 75a5192c6d Merge pull request #4958 from nymtech/bugfix/websocket-message-handling
bugfix: replace unreachable macro with an error return
2024-10-09 17:24:17 +01:00
Jędrzej Stuczyński 25ad0920cf bugfix: replace unreachable macro with an error return 2024-10-09 17:15:41 +01:00
Bogdan-Ștefan Neacşu a4c6f51fe0 Don't kill gateway on handle drop (#4934) 2024-09-27 11:02:39 +02:00
benedetta davico f76300669a Merge pull request #4931 from nymtech/feature/wedel-merge-conflicts
Wedel release to develop
2024-09-26 13:46:24 +02:00
Jędrzej Stuczyński 333ace1f97 Merge branch 'release/2024.11-wedel' into feature/wedel-merge-conflicts 2024-09-26 08:56:11 +01:00
Dinko Zdravac 487bf6732e Assume offline mode in sqlx (#4926)
* Assume offline mode

* PR feedback
2024-09-25 13:28:36 +02:00
Jędrzej Stuczyński 5d4a0fef55 Merge pull request #4871 from nymtech/chore/remove-another-mixnet-migration
chore: remove queued migration for adding explicit admin
2024-09-25 09:37:49 +01:00
Jon Häggblad 1627146c0e Make ip-packet-request VERSION pub (#4925) 2024-09-25 09:56:32 +02:00
Dinko Zdravac ae40a00b8f Data Observatory stub (#4905)
* Data Observatory stub

* Fix sqlx in CI

* Add troubleshooting tips for sqlx

* Update CI paths to trigger for this package

* Add this to CI upload binary build
2024-09-24 16:48:15 +02:00
Jon Häggblad 7f3c0470e0 Fix argument to cargo-deny action (#4922) 2024-09-24 13:17:35 +02:00
Bogdan-Ștefan Neacşu 1bc26ed79f Expose error type (#4924) 2024-09-24 11:48:54 +02:00
mx 60fa5cfeb8 Max/rust sdk stream abstraction (#4743)
* add TcpProxyClient and TcpProxyServer abstractions to SDK 
* add single connection example
* add multi-connection example 
* add simple echo server to `tools/`: used for multi-connection example 
* update FFI toml files: switched to local imports 
* add proxy bindings to `ffi/shared`
* add proxy bindings and example to `ffi/go` 
* add note to `ffi/cpp` about lack of Proxy bindings for the moment
2024-09-24 09:29:46 +00:00
Jon Häggblad 3b7088aeea Fix nymvpn.com url in mainnet defaults (#4920) 2024-09-24 10:25:27 +02:00
Bogdan-Ștefan Neacşu 179d214e21 Check both version and type in message header (#4918)
* Move client type to the client code

* Check both version and type in header
2024-09-23 17:57:03 +02:00
Jon Häggblad 2a94ce6443 Bump http-api-client default timeout to 30 sec (#4917) 2024-09-23 15:45:47 +02:00
Bogdan-Ștefan Neacşu 95ec91daa1 Entry wireguard tickets (#4888)
* Create credential verifier in authenticator

* Add new version of peer storage with client id

* Fix v1 to what it was before

* Compact storage into ecash verifier

* Fix non-linux build

* Less overlapping conditions

* Remove moved code

* Use handler thread for each peer

* Re-spawn stored handles at startup

* Keep new function without async & Result

* Put query peer in function too

* Query bandwidth

* Fix clippy

* Replace tap with inspect_err

* Fix copyright year

* Handle version 2 on the reqeust deser

* Add protocol type in req/resp messages
2024-09-23 14:49:18 +02:00
benedettadavico 803850be74 bump versions & update changelog 2024-09-23 10:00:20 +02:00
Jędrzej Stuczyński 3dc62a9a60 Merge pull request #4892 from nymtech/bugfix/ticketbook-false-double-spending
Bugfix/ticketbook false double spending
2024-09-19 09:44:43 +01:00
Jędrzej Stuczyński 5753b79997 slightly refactored bandwidth tracking 2024-09-18 11:27:35 +01:00
Jędrzej Stuczyński 2a6aa13ecd fixed client bandwidth being not correctly deducted 2024-09-18 11:12:24 +01:00
benedetta davico d5c9e1d8cb Merge pull request #4899 from nymtech/jon/cherry-pick-4894-into-wedel
Backport #4894 to fix ci
2024-09-18 09:28:15 +02:00
Jon Häggblad 87751894d9 Fix apt install in ci-build-upload-binaries.yml (#4894) 2024-09-18 09:07:45 +02:00
Jędrzej Stuczyński c8c3928575 put client bandwidth (gateway-side) behind shared pointer 2024-09-17 18:40:24 +01:00
Jędrzej Stuczyński 2fa8da8117 making sure there can be only a single client task claiming more bandwidth 2024-09-17 18:39:49 +01:00
Jędrzej Stuczyński 4548ef4d05 adding extra logs 2024-09-17 18:39:01 +01:00
Jędrzej Stuczyński 7f147ee2b0 Merge pull request #4891 from nymtech/bugfix/ticketbook-aux-imports
fix: allow updating globally stored signatures
2024-09-17 15:33:34 +01:00
Jędrzej Stuczyński 48bcd7e802 fix: allow updating globally stored signatures 2024-09-17 14:21:42 +01:00
benedettadavico aad028be3f update qa env 2024-09-13 11:48:49 +02:00
Bogdan-Ștefan Neacşu 6db3b34bcb Bump defguard to github latest version (#4872)
* Bump defguard to github latest version

* Fix comment location
2024-09-12 13:49:33 +02:00
Jędrzej Stuczyński f9383578da chore: remove queued migration for adding explicit admin 2024-09-12 11:03:51 +01:00
127 changed files with 5843 additions and 887 deletions
@@ -26,6 +26,7 @@ on:
- "nym-api/**"
- "nym-node/**"
- "nym-outfox/**"
- 'nym-data-observatory/**'
- "nym-validator-rewarder/**"
- "sdk/rust/nym-sdk/**"
- "service-providers/**"
@@ -96,6 +97,7 @@ jobs:
target/release/nym-socks5-client
target/release/nym-api
target/release/nym-network-requester
target/release/nym-data-observatory
target/release/nym-cli
target/release/nymvisor
target/release/nym-node
@@ -113,6 +115,7 @@ jobs:
cp target/release/nym-socks5-client $OUTPUT_DIR
cp target/release/nym-api $OUTPUT_DIR
cp target/release/nym-network-requester $OUTPUT_DIR
cp target/release/nym-data-observatory $OUTPUT_DIR
cp target/release/nymvisor $OUTPUT_DIR
cp target/release/nym-node $OUTPUT_DIR
cp target/release/nym-cli $OUTPUT_DIR
+1
View File
@@ -16,6 +16,7 @@ on:
- 'nym-api/**'
- 'nym-node/**'
- 'nym-outfox/**'
- 'nym-data-observatory/**'
- 'nym-validator-rewarder/**'
- 'tools/**'
- 'wasm/**'
+1 -1
View File
@@ -22,4 +22,4 @@ jobs:
with:
log-level: warn
command: check ${{ matrix.checks }}
argument: --all-features
arguments: --all-features
+76
View File
@@ -4,6 +4,82 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
## [Unreleased]
## [2024.11-wedel] (2024-09-23)
- Backport #4894 to fix ci ([#4899])
- Bugfix/ticketbook false double spending ([#4892])
- fix: allow updating globally stored signatures ([#4891])
- [DOCs/operators]: Document changelog for patch/2024.10-caramello ([#4886])
- [DOCs/operators]: Post release docs updates ([#4874])
- Bump defguard to github latest version ([#4872])
- chore: removed completed queued mixnet migration ([#4865])
- Disable push trigger and add missing paths in ci-build ([#4864])
- Fix linux conditional in ci-build.yml ([#4863])
- Remove golang workaround in ci-sdk-wasm ([#4858])
- Revert runner for ci-docs ([#4855])
- Move credential verification into common crate ([#4853])
- Fix test failure in ipr request size ([#4844])
- Start switching over jobs to arc-ubuntu-20.04 ([#4843])
- Use ecash credential type for bandwidth value ([#4840])
- Create nym-repo-setup debian package and nym-vpn meta package ([#4837])
- Remove serde_crate named import ([#4832])
- Run cargo autoinherit following last weeks dependabot updates ([#4831])
- revamped ticketbook serialisation and exposed additional cli methods ([#4827])
- Expose wireguard details on self described endpoint ([#4825])
- Remove unused wireguard flag from SDK ([#4823])
- Add `axum` server to `nym-api` ([#4803])
- Run cargo-autoinherit for a few new crates ([#4801])
- Update dependabot ([#4796])
- Fix clippy for unwrap_or_default ([#4783])
- Enable dependabot version upgrades for root rust workspace ([#4778])
- Persist used wireguard private IPs ([#4771])
- Avoid race on ip and registration structures ([#4766])
- docs/hotfix ([#4765])
- chore: remove repetitive words ([#4763])
- Make gateway latency check generic ([#4759])
- Remove duplicate stat count for retransmissions ([#4756])
- Update peer refresh value ([#4754])
- Remove deprecated mark_as_success and use new disarm ([#4751])
- Add get_mixnodes_described to validator_client ([#4725])
- New Network Monitor ([#4610])
[#4899]: https://github.com/nymtech/nym/pull/4899
[#4892]: https://github.com/nymtech/nym/pull/4892
[#4891]: https://github.com/nymtech/nym/pull/4891
[#4886]: https://github.com/nymtech/nym/pull/4886
[#4874]: https://github.com/nymtech/nym/pull/4874
[#4872]: https://github.com/nymtech/nym/pull/4872
[#4865]: https://github.com/nymtech/nym/pull/4865
[#4864]: https://github.com/nymtech/nym/pull/4864
[#4863]: https://github.com/nymtech/nym/pull/4863
[#4858]: https://github.com/nymtech/nym/pull/4858
[#4855]: https://github.com/nymtech/nym/pull/4855
[#4853]: https://github.com/nymtech/nym/pull/4853
[#4844]: https://github.com/nymtech/nym/pull/4844
[#4843]: https://github.com/nymtech/nym/pull/4843
[#4840]: https://github.com/nymtech/nym/pull/4840
[#4837]: https://github.com/nymtech/nym/pull/4837
[#4832]: https://github.com/nymtech/nym/pull/4832
[#4831]: https://github.com/nymtech/nym/pull/4831
[#4827]: https://github.com/nymtech/nym/pull/4827
[#4825]: https://github.com/nymtech/nym/pull/4825
[#4823]: https://github.com/nymtech/nym/pull/4823
[#4803]: https://github.com/nymtech/nym/pull/4803
[#4801]: https://github.com/nymtech/nym/pull/4801
[#4796]: https://github.com/nymtech/nym/pull/4796
[#4783]: https://github.com/nymtech/nym/pull/4783
[#4778]: https://github.com/nymtech/nym/pull/4778
[#4771]: https://github.com/nymtech/nym/pull/4771
[#4766]: https://github.com/nymtech/nym/pull/4766
[#4765]: https://github.com/nymtech/nym/pull/4765
[#4763]: https://github.com/nymtech/nym/pull/4763
[#4759]: https://github.com/nymtech/nym/pull/4759
[#4756]: https://github.com/nymtech/nym/pull/4756
[#4754]: https://github.com/nymtech/nym/pull/4754
[#4751]: https://github.com/nymtech/nym/pull/4751
[#4725]: https://github.com/nymtech/nym/pull/4725
[#4610]: https://github.com/nymtech/nym/pull/4610
## [2024.10-caramello] (2024-09-10)
- Backport 4844 and 4845 ([#4857])
Generated
+532 -43
View File
File diff suppressed because it is too large Load Diff
+13 -2
View File
@@ -5,6 +5,7 @@
panic = "abort"
opt-level = "s"
overflow-checks = true
debug = true
[profile.dev]
panic = "abort"
@@ -81,6 +82,7 @@ members = [
"common/nyxd-scraper",
"common/pemstore",
"common/serde-helpers",
"common/service-provider-requests-common",
"common/socks5-client-core",
"common/socks5/proxy-helpers",
"common/socks5/requests",
@@ -102,6 +104,9 @@ members = [
"mixnode",
"sdk/lib/socks5-listener",
"sdk/rust/nym-sdk",
"sdk/ffi/shared",
"sdk/ffi/go",
"sdk/ffi/cpp",
"service-providers/authenticator",
"service-providers/common",
"service-providers/ip-packet-router",
@@ -110,11 +115,13 @@ members = [
"nym-api",
"nym-browser-extension/storage",
"nym-api/nym-api-requests",
"nym-data-observatory",
"nym-node",
"nym-node/nym-node-http-api",
"nym-node/nym-node-requests",
"nym-outfox",
"nym-validator-rewarder",
"tools/echo-server",
"tools/internal/ssl-inject",
# "tools/internal/sdk-version-bump",
"tools/internal/testnet-manager",
@@ -129,6 +136,9 @@ members = [
"wasm/mix-fetch",
"wasm/node-tester",
"wasm/zknym-lib",
"tools/internal/testnet-manager",
"tools/internal/testnet-manager/dkg-bypass-contract",
"tools/echo-server",
]
default-members = [
@@ -138,6 +148,7 @@ default-members = [
"gateway",
"mixnode",
"nym-api",
"nym-data-observatory",
"nym-node",
"nym-validator-rewarder",
"service-providers/authenticator",
@@ -152,7 +163,6 @@ exclude = [
"nym-wallet",
"nym-vpn/ui/src-tauri",
"cpu-cycles",
"sdk/ffi/cpp",
]
[workspace.package]
@@ -212,7 +222,8 @@ ctr = "0.9.1"
cupid = "0.6.1"
curve25519-dalek = "4.1"
dashmap = "5.5.3"
defguard_wireguard_rs = "0.4.2"
# We want https://github.com/DefGuard/wireguard-rs/pull/64 , but there's no crates.io release being pushed out anymore
defguard_wireguard_rs = { git = "https://github.com/DefGuard/wireguard-rs.git", rev = "v0.4.7" }
digest = "0.10.7"
dirs = "5.0"
doc-comment = "0.3"
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-client"
version = "1.1.40"
version = "1.1.41"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>", "Jędrzej Stuczyński <andrew@nymtech.net>"]
description = "Implementation of the Nym Client"
edition = "2021"
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-socks5-client"
version = "1.1.40"
version = "1.1.41"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
description = "A SOCKS5 localhost proxy that converts incoming messages to Sphinx and sends them to a Nym address"
edition = "2021"
+15
View File
@@ -9,9 +9,24 @@ edition.workspace = true
license.workspace = true
[dependencies]
base64 = { workspace = true }
bincode = { workspace = true }
rand = { workspace = true }
serde = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
nym-credentials-interface = { path = "../credentials-interface" }
nym-crypto = { path = "../crypto", features = ["asymmetric"] }
nym-service-provider-requests-common = { path = "../service-provider-requests-common" }
nym-sphinx = { path = "../nymsphinx" }
nym-wireguard-types = { path = "../wireguard-types" }
## verify:
hmac = { workspace = true, optional = true }
sha2 = { workspace = true, optional = true }
x25519-dalek = { workspace = true, features = ["static_secrets"] }
[features]
default = ["verify"]
# this is moved to a separate feature as we really need clients to import it (especially, *cough*, wasm)
verify = ["hmac", "sha2"]
@@ -0,0 +1,22 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("the provided base64-encoded client MAC ('{mac}') was malformed: {source}")]
MalformedClientMac {
mac: String,
#[source]
source: base64::DecodeError,
},
#[cfg(feature = "verify")]
#[error("failed to verify mac provided by '{client}': {source}")]
FailedClientMacVerification {
client: String,
#[source]
source: hmac::digest::MacError,
},
}
+7 -1
View File
@@ -2,8 +2,14 @@
// SPDX-License-Identifier: Apache-2.0
pub mod v1;
pub mod v2;
pub const CURRENT_VERSION: u8 = 1;
mod error;
pub use error::Error;
pub use v2 as latest;
pub const CURRENT_VERSION: u8 = 2;
fn make_bincode_serializer() -> impl bincode::Options {
use bincode::Options;
@@ -1,7 +1,13 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod registration;
pub mod request;
pub mod response;
pub use registration::{ClientMac, GatewayClient, InitMessage, Nonce};
#[cfg(feature = "verify")]
pub use registration::HmacSha256;
const VERSION: u8 = 1;
@@ -0,0 +1,218 @@
// Copyright 2023-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::Error;
use base64::{engine::general_purpose, Engine};
use nym_wireguard_types::PeerPublicKey;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::IpAddr;
use std::time::SystemTime;
use std::{fmt, ops::Deref, str::FromStr};
#[cfg(feature = "verify")]
use hmac::{Hmac, Mac};
#[cfg(feature = "verify")]
use nym_crypto::asymmetric::encryption::PrivateKey;
#[cfg(feature = "verify")]
use sha2::Sha256;
pub type PendingRegistrations = HashMap<PeerPublicKey, RegistrationData>;
pub type PrivateIPs = HashMap<IpAddr, Taken>;
#[cfg(feature = "verify")]
pub type HmacSha256 = Hmac<Sha256>;
pub type Nonce = u64;
pub type Taken = Option<SystemTime>;
pub const BANDWIDTH_CAP_PER_DAY: i64 = 1024 * 1024 * 1024; // 1 GB
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct InitMessage {
/// Base64 encoded x25519 public key
pub pub_key: PeerPublicKey,
}
impl InitMessage {
pub fn new(pub_key: PeerPublicKey) -> Self {
InitMessage { pub_key }
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RegistrationData {
pub nonce: u64,
pub gateway_data: GatewayClient,
pub wg_port: u16,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RegistredData {
pub pub_key: PeerPublicKey,
pub private_ip: IpAddr,
pub wg_port: u16,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RemainingBandwidthData {
pub available_bandwidth: u64,
pub suspended: bool,
}
/// Client that wants to register sends its PublicKey bytes mac digest encrypted with a DH shared secret.
/// Gateway/Nym node can then verify pub_key payload using the same process
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct GatewayClient {
/// Base64 encoded x25519 public key
pub pub_key: PeerPublicKey,
/// Assigned private IP
pub private_ip: IpAddr,
/// Sha256 hmac on the data (alongside the prior nonce)
pub mac: ClientMac,
}
impl GatewayClient {
#[cfg(feature = "verify")]
pub fn new(
local_secret: &PrivateKey,
remote_public: x25519_dalek::PublicKey,
private_ip: IpAddr,
nonce: u64,
) -> Self {
// convert from 1.0 x25519-dalek private key into 2.0 x25519-dalek
#[allow(clippy::expect_used)]
let static_secret = x25519_dalek::StaticSecret::from(local_secret.to_bytes());
let local_public: x25519_dalek::PublicKey = (&static_secret).into();
let dh = static_secret.diffie_hellman(&remote_public);
// TODO: change that to use our nym_crypto::hmac module instead
#[allow(clippy::expect_used)]
let mut mac = HmacSha256::new_from_slice(dh.as_bytes())
.expect("x25519 shared secret is always 32 bytes long");
mac.update(local_public.as_bytes());
mac.update(private_ip.to_string().as_bytes());
mac.update(&nonce.to_le_bytes());
GatewayClient {
pub_key: PeerPublicKey::new(local_public),
private_ip,
mac: ClientMac(mac.finalize().into_bytes().to_vec()),
}
}
// Reusable secret should be gateways Wireguard PK
// Client should perform this step when generating its payload, using its own WG PK
#[cfg(feature = "verify")]
pub fn verify(&self, gateway_key: &PrivateKey, nonce: u64) -> Result<(), Error> {
// convert from 1.0 x25519-dalek private key into 2.0 x25519-dalek
#[allow(clippy::expect_used)]
let static_secret = x25519_dalek::StaticSecret::from(gateway_key.to_bytes());
let dh = static_secret.diffie_hellman(&self.pub_key);
// TODO: change that to use our nym_crypto::hmac module instead
#[allow(clippy::expect_used)]
let mut mac = HmacSha256::new_from_slice(dh.as_bytes())
.expect("x25519 shared secret is always 32 bytes long");
mac.update(self.pub_key.as_bytes());
mac.update(self.private_ip.to_string().as_bytes());
mac.update(&nonce.to_le_bytes());
mac.verify_slice(&self.mac)
.map_err(|source| Error::FailedClientMacVerification {
client: self.pub_key.to_string(),
source,
})
}
pub fn pub_key(&self) -> PeerPublicKey {
self.pub_key
}
}
// TODO: change the inner type into generic array of size HmacSha256::OutputSize
// TODO2: rely on our internal crypto/hmac
#[derive(Debug, Clone)]
pub struct ClientMac(Vec<u8>);
impl fmt::Display for ClientMac {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", general_purpose::STANDARD.encode(&self.0))
}
}
impl ClientMac {
#[allow(dead_code)]
pub fn new(mac: Vec<u8>) -> Self {
ClientMac(mac)
}
}
impl Deref for ClientMac {
type Target = Vec<u8>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl FromStr for ClientMac {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mac_bytes: Vec<u8> =
general_purpose::STANDARD
.decode(s)
.map_err(|source| Error::MalformedClientMac {
mac: s.to_string(),
source,
})?;
Ok(ClientMac(mac_bytes))
}
}
impl Serialize for ClientMac {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let encoded_key = general_purpose::STANDARD.encode(self.0.clone());
serializer.serialize_str(&encoded_key)
}
}
impl<'de> Deserialize<'de> for ClientMac {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let encoded_key = String::deserialize(deserializer)?;
ClientMac::from_str(&encoded_key).map_err(serde::de::Error::custom)
}
}
#[cfg(test)]
mod tests {
use super::*;
use nym_crypto::asymmetric::encryption;
#[test]
#[cfg(feature = "verify")]
fn client_request_roundtrip() {
let mut rng = rand::thread_rng();
let gateway_key_pair = encryption::KeyPair::new(&mut rng);
let client_key_pair = encryption::KeyPair::new(&mut rng);
let nonce = 1234567890;
let client = GatewayClient::new(
client_key_pair.private_key(),
x25519_dalek::PublicKey::from(gateway_key_pair.public_key().to_bytes()),
"10.0.0.42".parse().unwrap(),
nonce,
);
assert!(client.verify(gateway_key_pair.private_key(), nonce).is_ok())
}
}
@@ -1,8 +1,9 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::registration::{GatewayClient, InitMessage};
use nym_sphinx::addressing::Recipient;
use nym_wireguard_types::{GatewayClient, InitMessage, PeerPublicKey};
use nym_wireguard_types::PeerPublicKey;
use serde::{Deserialize, Serialize};
use crate::make_bincode_serializer;
@@ -82,3 +83,24 @@ pub enum AuthenticatorRequestData {
Final(GatewayClient),
QueryBandwidth(PeerPublicKey),
}
#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;
#[test]
fn check_first_byte_version() {
let version = 2;
let data = AuthenticatorRequest {
version,
data: AuthenticatorRequestData::Initial(InitMessage::new(
PeerPublicKey::from_str("yvNUDpT5l7W/xDhiu6HkqTHDQwbs/B3J5UrLmORl1EQ=").unwrap(),
)),
reply_to: Recipient::try_from_base58_string("D1rrpsysCGCYXy9saP8y3kmNpGtJZUXN9SvFoUcqAsM9.9Ssso1ea5NfkbMASdiseDSjTN1fSWda5SgEVjdSN4CvV@GJqd3ZxpXWSNxTfx7B1pPtswpetH4LnJdFeLeuY5KUuN").unwrap(),
request_id: 1,
};
let bytes = data.to_bytes().unwrap();
assert_eq!(*bytes.first().unwrap(), version);
}
}
@@ -1,8 +1,8 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::registration::{RegistrationData, RegistredData, RemainingBandwidthData};
use nym_sphinx::addressing::Recipient;
use nym_wireguard_types::registration::{RegistrationData, RegistredData, RemainingBandwidthData};
use serde::{Deserialize, Serialize};
use crate::make_bincode_serializer;
@@ -0,0 +1,69 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_service_provider_requests_common::{Protocol, ServiceProviderType};
use crate::{v1, v2};
impl From<v1::request::AuthenticatorRequest> for v2::request::AuthenticatorRequest {
fn from(authenticator_request: v1::request::AuthenticatorRequest) -> Self {
Self {
protocol: Protocol {
version: 2,
service_provider_type: ServiceProviderType::Authenticator,
},
data: authenticator_request.data.into(),
reply_to: authenticator_request.reply_to,
request_id: authenticator_request.request_id,
}
}
}
impl From<v1::request::AuthenticatorRequestData> for v2::request::AuthenticatorRequestData {
fn from(authenticator_request_data: v1::request::AuthenticatorRequestData) -> Self {
match authenticator_request_data {
v1::request::AuthenticatorRequestData::Initial(init_msg) => {
v2::request::AuthenticatorRequestData::Initial(init_msg.into())
}
v1::request::AuthenticatorRequestData::Final(gw_client) => {
v2::request::AuthenticatorRequestData::Final(gw_client.into())
}
v1::request::AuthenticatorRequestData::QueryBandwidth(pub_key) => {
v2::request::AuthenticatorRequestData::QueryBandwidth(pub_key)
}
}
}
}
impl From<v1::registration::InitMessage> for v2::registration::InitMessage {
fn from(init_msg: v1::registration::InitMessage) -> Self {
Self {
pub_key: init_msg.pub_key,
}
}
}
impl From<v1::registration::GatewayClient> for Box<v2::registration::FinalMessage> {
fn from(gw_client: v1::registration::GatewayClient) -> Self {
Box::new(v2::registration::FinalMessage {
gateway_client: gw_client.into(),
credential: None,
})
}
}
impl From<v1::registration::GatewayClient> for v2::registration::GatewayClient {
fn from(gw_client: v1::registration::GatewayClient) -> Self {
Self {
pub_key: gw_client.pub_key,
private_ip: gw_client.private_ip,
mac: gw_client.mac.into(),
}
}
}
impl From<v1::registration::ClientMac> for v2::registration::ClientMac {
fn from(mac: v1::registration::ClientMac) -> Self {
Self::new(mac.to_vec())
}
}
@@ -0,0 +1,9 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod conversion;
pub mod registration;
pub mod request;
pub mod response;
const VERSION: u8 = 2;
@@ -1,9 +1,10 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// Copyright 2023-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::Error;
use crate::PeerPublicKey;
use base64::{engine::general_purpose, Engine};
use nym_credentials_interface::CredentialSpendingData;
use nym_wireguard_types::PeerPublicKey;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::IpAddr;
@@ -29,32 +30,26 @@ pub type Taken = Option<SystemTime>;
pub const BANDWIDTH_CAP_PER_DAY: u64 = 1024 * 1024 * 1024; // 1 GB
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "type", rename_all = "camelCase")]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub enum ClientMessage {
Initial(InitMessage),
Final(GatewayClient),
Query(PeerPublicKey),
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct InitMessage {
/// Base64 encoded x25519 public key
#[cfg_attr(feature = "openapi", schema(value_type = String, format = Byte))]
pub pub_key: PeerPublicKey,
}
impl InitMessage {
pub fn pub_key(&self) -> PeerPublicKey {
self.pub_key
}
pub fn new(pub_key: PeerPublicKey) -> Self {
InitMessage { pub_key }
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct FinalMessage {
/// Gateway client data
pub gateway_client: GatewayClient,
/// Ecash credential
pub credential: Option<CredentialSpendingData>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RegistrationData {
pub nonce: u64,
@@ -71,24 +66,20 @@ pub struct RegistredData {
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RemainingBandwidthData {
pub available_bandwidth: u64,
pub suspended: bool,
pub available_bandwidth: i64,
}
/// Client that wants to register sends its PublicKey bytes mac digest encrypted with a DH shared secret.
/// Gateway/Nym node can then verify pub_key payload using the same process
#[derive(Serialize, Deserialize, Debug, Clone)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct GatewayClient {
/// Base64 encoded x25519 public key
#[cfg_attr(feature = "openapi", schema(value_type = String, format = Byte))]
pub pub_key: PeerPublicKey,
/// Assigned private IP
pub private_ip: IpAddr,
/// Sha256 hmac on the data (alongside the prior nonce)
#[cfg_attr(feature = "openapi", schema(value_type = String, format = Byte))]
pub mac: ClientMac,
}
@@ -0,0 +1,116 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::registration::{FinalMessage, InitMessage};
use nym_service_provider_requests_common::{Protocol, ServiceProviderType};
use nym_sphinx::addressing::Recipient;
use nym_wireguard_types::PeerPublicKey;
use serde::{Deserialize, Serialize};
use crate::make_bincode_serializer;
use super::VERSION;
fn generate_random() -> u64 {
use rand::RngCore;
let mut rng = rand::rngs::OsRng;
rng.next_u64()
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AuthenticatorRequest {
pub protocol: Protocol,
pub data: AuthenticatorRequestData,
pub reply_to: Recipient,
pub request_id: u64,
}
impl AuthenticatorRequest {
pub fn from_reconstructed_message(
message: &nym_sphinx::receiver::ReconstructedMessage,
) -> Result<Self, bincode::Error> {
use bincode::Options;
make_bincode_serializer().deserialize(&message.message)
}
pub fn new_initial_request(init_message: InitMessage, reply_to: Recipient) -> (Self, u64) {
let request_id = generate_random();
(
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorRequestData::Initial(init_message),
reply_to,
request_id,
},
request_id,
)
}
pub fn new_final_request(final_message: FinalMessage, reply_to: Recipient) -> (Self, u64) {
let request_id = generate_random();
(
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorRequestData::Final(Box::new(final_message)),
reply_to,
request_id,
},
request_id,
)
}
pub fn new_query_request(peer_public_key: PeerPublicKey, reply_to: Recipient) -> (Self, u64) {
let request_id = generate_random();
(
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorRequestData::QueryBandwidth(peer_public_key),
reply_to,
request_id,
},
request_id,
)
}
pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::Error> {
use bincode::Options;
make_bincode_serializer().serialize(self)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum AuthenticatorRequestData {
Initial(InitMessage),
Final(Box<FinalMessage>),
QueryBandwidth(PeerPublicKey),
}
#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;
#[test]
fn check_first_bytes_protocol() {
let version = 2;
let data = AuthenticatorRequest {
protocol: Protocol { version, service_provider_type: ServiceProviderType::Authenticator },
data: AuthenticatorRequestData::Initial(InitMessage::new(
PeerPublicKey::from_str("yvNUDpT5l7W/xDhiu6HkqTHDQwbs/B3J5UrLmORl1EQ=").unwrap(),
)),
reply_to: Recipient::try_from_base58_string("D1rrpsysCGCYXy9saP8y3kmNpGtJZUXN9SvFoUcqAsM9.9Ssso1ea5NfkbMASdiseDSjTN1fSWda5SgEVjdSN4CvV@GJqd3ZxpXWSNxTfx7B1pPtswpetH4LnJdFeLeuY5KUuN").unwrap(),
request_id: 1,
};
let bytes = *data.to_bytes().unwrap().first_chunk::<2>().unwrap();
assert_eq!(bytes, [version, ServiceProviderType::Authenticator as u8]);
}
}
@@ -0,0 +1,129 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::registration::{RegistrationData, RegistredData, RemainingBandwidthData};
use nym_service_provider_requests_common::{Protocol, ServiceProviderType};
use nym_sphinx::addressing::Recipient;
use serde::{Deserialize, Serialize};
use crate::make_bincode_serializer;
use super::VERSION;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AuthenticatorResponse {
pub protocol: Protocol,
pub data: AuthenticatorResponseData,
pub reply_to: Recipient,
}
impl AuthenticatorResponse {
pub fn new_pending_registration_success(
registration_data: RegistrationData,
request_id: u64,
reply_to: Recipient,
) -> Self {
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorResponseData::PendingRegistration(PendingRegistrationResponse {
reply: registration_data,
reply_to,
request_id,
}),
reply_to,
}
}
pub fn new_registered(
registred_data: RegistredData,
reply_to: Recipient,
request_id: u64,
) -> Self {
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorResponseData::Registered(RegisteredResponse {
reply: registred_data,
reply_to,
request_id,
}),
reply_to,
}
}
pub fn new_remaining_bandwidth(
remaining_bandwidth_data: Option<RemainingBandwidthData>,
reply_to: Recipient,
request_id: u64,
) -> Self {
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorResponseData::RemainingBandwidth(RemainingBandwidthResponse {
reply: remaining_bandwidth_data,
reply_to,
request_id,
}),
reply_to,
}
}
pub fn recipient(&self) -> Recipient {
self.reply_to
}
pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::Error> {
use bincode::Options;
make_bincode_serializer().serialize(self)
}
pub fn from_reconstructed_message(
message: &nym_sphinx::receiver::ReconstructedMessage,
) -> Result<Self, bincode::Error> {
use bincode::Options;
make_bincode_serializer().deserialize(&message.message)
}
pub fn id(&self) -> Option<u64> {
match &self.data {
AuthenticatorResponseData::PendingRegistration(response) => Some(response.request_id),
AuthenticatorResponseData::Registered(response) => Some(response.request_id),
AuthenticatorResponseData::RemainingBandwidth(response) => Some(response.request_id),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum AuthenticatorResponseData {
PendingRegistration(PendingRegistrationResponse),
Registered(RegisteredResponse),
RemainingBandwidth(RemainingBandwidthResponse),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PendingRegistrationResponse {
pub request_id: u64,
pub reply_to: Recipient,
pub reply: RegistrationData,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RegisteredResponse {
pub request_id: u64,
pub reply_to: Recipient,
pub reply: RegistredData,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RemainingBandwidthResponse {
pub request_id: u64,
pub reply_to: Recipient,
pub reply: Option<RemainingBandwidthData>,
}
@@ -2,21 +2,37 @@
// SPDX-License-Identifier: Apache-2.0
use si_scale::helpers::bibytes2;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
#[derive(Clone, Default)]
pub(crate) struct BandwidthClaimGuard {
inner: Arc<ClientBandwidthInner>,
}
impl Drop for BandwidthClaimGuard {
fn drop(&mut self) {
let old = self.inner.claiming_more.swap(false, Ordering::SeqCst);
assert!(
old,
"critical failure: there were multiple BandwidthClaimGuard existing"
)
}
}
#[derive(Clone)]
pub struct ClientBandwidth {
inner: Arc<ClientBandwidthInner>,
}
#[derive(Default)]
struct ClientBandwidthInner {
/// the actual bandwidth amount (in bytes) available
available: AtomicI64,
/// flag to indicate whether this client is currently in the process of claiming additional bandwidth
claiming_more: AtomicBool,
/// defines the timestamp when the bandwidth information has been logged to the logs stream
last_logged_ts: AtomicI64,
@@ -29,11 +45,28 @@ impl ClientBandwidth {
ClientBandwidth {
inner: Arc::new(ClientBandwidthInner {
available: AtomicI64::new(0),
claiming_more: AtomicBool::new(false),
last_logged_ts: AtomicI64::new(0),
last_updated_ts: AtomicI64::new(0),
}),
}
}
pub(crate) fn begin_bandwidth_claim(&self) -> Option<BandwidthClaimGuard> {
if self
.inner
.claiming_more
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
Some(BandwidthClaimGuard {
inner: self.inner.clone(),
})
} else {
None
}
}
pub(crate) fn remaining(&self) -> i64 {
self.inner.available.load(Ordering::Acquire)
}
@@ -724,6 +724,11 @@ impl<C, St> GatewayClient<C, St> {
return Err(GatewayClientError::NoBandwidthControllerAvailable);
}
let Some(_claim_guard) = self.bandwidth.begin_bandwidth_claim() else {
debug!("there's already an existing bandwidth claim ongoing");
return Ok(());
};
warn!("Not enough bandwidth. Trying to get more bandwidth, this might take a while");
if !self.cfg.bandwidth.require_tickets {
info!("The client is running in disabled credentials mode - attempting to claim bandwidth without a credential");
@@ -171,10 +171,20 @@ impl SqliteEcashTicketbookManager {
data: &[u8],
) -> Result<(), sqlx::Error> {
sqlx::query!(
"INSERT INTO master_verification_key(epoch_id, serialised_key, serialization_revision) VALUES (?, ?, ?)",
r#"
INSERT OR IGNORE INTO master_verification_key(epoch_id, serialised_key, serialization_revision) VALUES (?, ?, ?);
UPDATE master_verification_key
SET
serialised_key = ?,
serialization_revision = ?
WHERE epoch_id = ?
"#,
epoch_id,
data,
serialisation_revision
serialisation_revision,
data,
serialisation_revision,
epoch_id
)
.execute(&self.connection_pool)
.await?;
@@ -204,10 +214,20 @@ impl SqliteEcashTicketbookManager {
data: &[u8],
) -> Result<(), sqlx::Error> {
sqlx::query!(
"INSERT INTO coin_indices_signatures(epoch_id, serialised_signatures, serialization_revision) VALUES (?, ?, ?)",
r#"
INSERT OR IGNORE INTO coin_indices_signatures(epoch_id, serialised_signatures, serialization_revision) VALUES (?, ?, ?);
UPDATE coin_indices_signatures
SET
serialised_signatures = ?,
serialization_revision = ?
WHERE epoch_id = ?
"#,
epoch_id,
data,
serialisation_revision
serialisation_revision,
data,
serialisation_revision,
epoch_id,
)
.execute(&self.connection_pool)
.await?;
@@ -240,13 +260,21 @@ impl SqliteEcashTicketbookManager {
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO expiration_date_signatures(expiration_date, epoch_id, serialised_signatures, serialization_revision)
VALUES (?, ?, ?, ?)
INSERT OR IGNORE INTO expiration_date_signatures(expiration_date, epoch_id, serialised_signatures, serialization_revision)
VALUES (?, ?, ?, ?);
UPDATE expiration_date_signatures
SET
serialised_signatures = ?,
serialization_revision = ?
WHERE expiration_date = ?
"#,
expiration_date,
epoch_id,
data,
serialisation_revision
serialisation_revision,
data,
serialisation_revision,
expiration_date
)
.execute(&self.connection_pool)
.await?;
@@ -1,6 +1,9 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::*;
use crate::BandwidthFlushingBehaviourConfig;
use crate::ClientBandwidth;
use nym_credentials::ecash::utils::ecash_today;
use nym_credentials_interface::Bandwidth;
use nym_gateway_requests::ServerResponse;
@@ -9,10 +12,6 @@ use si_scale::helpers::bibytes2;
use time::OffsetDateTime;
use tracing::*;
use crate::error::*;
use crate::BandwidthFlushingBehaviourConfig;
use crate::ClientBandwidth;
const FREE_TESTNET_BANDWIDTH_VALUE: Bandwidth = Bandwidth::new_unchecked(64 * 1024 * 1024 * 1024); // 64GB
#[derive(Clone)]
@@ -41,9 +40,13 @@ impl<S: Storage + Clone + 'static> BandwidthStorageManager<S> {
}
}
pub async fn available_bandwidth(&self) -> i64 {
self.client_bandwidth.available().await
}
async fn sync_expiration(&mut self) -> Result<()> {
self.storage
.set_expiration(self.client_id, self.client_bandwidth.bandwidth.expiration)
.set_expiration(self.client_id, self.client_bandwidth.expiration().await)
.await?;
Ok(())
}
@@ -57,17 +60,17 @@ impl<S: Storage + Clone + 'static> BandwidthStorageManager<S> {
self.increase_bandwidth(FREE_TESTNET_BANDWIDTH_VALUE, ecash_today())
.await?;
let available_total = self.client_bandwidth.bandwidth.bytes;
let available_total = self.client_bandwidth.available().await;
Ok(ServerResponse::Bandwidth { available_total })
}
#[instrument(skip_all)]
pub async fn try_use_bandwidth(&mut self, required_bandwidth: i64) -> Result<i64> {
if self.client_bandwidth.bandwidth.expired() {
if self.client_bandwidth.expired().await {
self.expire_bandwidth().await?;
}
let available_bandwidth = self.client_bandwidth.bandwidth.bytes;
let available_bandwidth = self.client_bandwidth.available().await;
if available_bandwidth < required_bandwidth {
return Err(Error::OutOfBandwidth {
@@ -86,8 +89,7 @@ impl<S: Storage + Clone + 'static> BandwidthStorageManager<S> {
async fn expire_bandwidth(&mut self) -> Result<()> {
self.storage.reset_bandwidth(self.client_id).await?;
self.client_bandwidth.bandwidth = Default::default();
self.client_bandwidth.update_sync_data();
self.client_bandwidth.expire_bandwidth().await;
Ok(())
}
@@ -97,31 +99,31 @@ impl<S: Storage + Clone + 'static> BandwidthStorageManager<S> {
///
/// * `amount`: amount to decrease the available bandwidth by.
async fn consume_bandwidth(&mut self, amount: i64) -> Result<()> {
self.client_bandwidth.bandwidth.bytes -= amount;
self.client_bandwidth.bytes_delta_since_sync -= amount;
self.client_bandwidth.decrease_bandwidth(amount).await;
// since we're going to be operating on a fair use policy anyway, even if we crash and let extra few packets
// through, that's completely fine
if self.client_bandwidth.should_sync(self.bandwidth_cfg) {
self.sync_bandwidth().await?;
if self.client_bandwidth.should_sync(self.bandwidth_cfg).await {
self.sync_storage_bandwidth().await?;
}
Ok(())
}
#[instrument(level = "trace", skip_all)]
async fn sync_bandwidth(&mut self) -> Result<()> {
async fn sync_storage_bandwidth(&mut self) -> Result<()> {
trace!("syncing client bandwidth with the underlying storage");
let updated = self
.storage
.increase_bandwidth(self.client_id, self.client_bandwidth.bytes_delta_since_sync)
.increase_bandwidth(
self.client_id,
self.client_bandwidth.delta_since_sync().await,
)
.await?;
trace!(updated);
self.client_bandwidth.bandwidth.bytes = updated;
self.client_bandwidth.update_sync_data();
self.client_bandwidth
.resync_bandwidth_with_storage(updated)
.await;
Ok(())
}
@@ -136,13 +138,14 @@ impl<S: Storage + Clone + 'static> BandwidthStorageManager<S> {
bandwidth: Bandwidth,
expiration: OffsetDateTime,
) -> Result<()> {
self.client_bandwidth.bandwidth.bytes += bandwidth.value() as i64;
self.client_bandwidth.bytes_delta_since_sync += bandwidth.value() as i64;
self.client_bandwidth.bandwidth.expiration = expiration;
self.client_bandwidth
.increase_bandwidth(bandwidth.value() as i64, expiration)
.await;
// any increases to bandwidth should get flushed immediately
// (we don't want to accidentally miss somebody claiming a gigabyte voucher)
self.sync_expiration().await?;
self.sync_bandwidth().await
self.sync_storage_bandwidth().await?;
Ok(())
}
}
@@ -1,10 +1,15 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::time::Duration;
use nym_credentials::ecash::utils::ecash_today;
use nym_credentials_interface::AvailableBandwidth;
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::sync::RwLock;
const DEFAULT_CLIENT_BANDWIDTH_MAX_FLUSHING_RATE: Duration = Duration::from_millis(5);
const DEFAULT_CLIENT_BANDWIDTH_MAX_DELTA_FLUSHING_AMOUNT: i64 = 512 * 1024; // 512kB
#[derive(Debug, Clone, Copy)]
pub struct BandwidthFlushingBehaviourConfig {
@@ -15,10 +20,25 @@ pub struct BandwidthFlushingBehaviourConfig {
pub client_bandwidth_max_delta_flushing_amount: i64,
}
#[derive(Debug, Clone, Copy)]
impl Default for BandwidthFlushingBehaviourConfig {
fn default() -> Self {
Self {
client_bandwidth_max_flushing_rate: DEFAULT_CLIENT_BANDWIDTH_MAX_FLUSHING_RATE,
client_bandwidth_max_delta_flushing_amount:
DEFAULT_CLIENT_BANDWIDTH_MAX_DELTA_FLUSHING_AMOUNT,
}
}
}
#[derive(Debug, Clone)]
pub struct ClientBandwidth {
inner: Arc<RwLock<ClientBandwidthInner>>,
}
#[derive(Debug)]
struct ClientBandwidthInner {
pub(crate) bandwidth: AvailableBandwidth,
pub(crate) last_flushed: OffsetDateTime,
pub(crate) last_synced: OffsetDateTime,
/// the number of bytes the client had during the last sync.
/// it is used to determine whether the current value should be synced with the storage
@@ -30,28 +50,74 @@ pub struct ClientBandwidth {
impl ClientBandwidth {
pub fn new(bandwidth: AvailableBandwidth) -> ClientBandwidth {
ClientBandwidth {
bandwidth,
last_flushed: OffsetDateTime::now_utc(),
bytes_at_last_sync: bandwidth.bytes,
bytes_delta_since_sync: 0,
inner: Arc::new(RwLock::new(ClientBandwidthInner {
bandwidth,
last_synced: OffsetDateTime::now_utc(),
bytes_at_last_sync: bandwidth.bytes,
bytes_delta_since_sync: 0,
})),
}
}
pub(crate) fn should_sync(&self, cfg: BandwidthFlushingBehaviourConfig) -> bool {
if self.bytes_delta_since_sync.abs() >= cfg.client_bandwidth_max_delta_flushing_amount {
pub(crate) async fn should_sync(&self, cfg: BandwidthFlushingBehaviourConfig) -> bool {
let guard = self.inner.read().await;
if guard.bytes_delta_since_sync.abs() >= cfg.client_bandwidth_max_delta_flushing_amount {
return true;
}
if self.last_flushed + cfg.client_bandwidth_max_flushing_rate < OffsetDateTime::now_utc() {
if guard.last_synced + cfg.client_bandwidth_max_flushing_rate < OffsetDateTime::now_utc() {
return true;
}
false
}
pub(crate) fn update_sync_data(&mut self) {
self.last_flushed = OffsetDateTime::now_utc();
self.bytes_at_last_sync = self.bandwidth.bytes;
self.bytes_delta_since_sync = 0;
pub(crate) async fn available(&self) -> i64 {
self.inner.read().await.bandwidth.bytes
}
pub(crate) async fn delta_since_sync(&self) -> i64 {
self.inner.read().await.bytes_delta_since_sync
}
pub(crate) async fn expiration(&self) -> OffsetDateTime {
self.inner.read().await.bandwidth.expiration
}
pub(crate) async fn expired(&self) -> bool {
self.expiration().await < ecash_today()
}
pub(crate) async fn decrease_bandwidth(&self, decrease: i64) {
let mut guard = self.inner.write().await;
guard.bandwidth.bytes -= decrease;
guard.bytes_delta_since_sync -= decrease;
}
pub(crate) async fn increase_bandwidth(&self, increase: i64, new_expiration: OffsetDateTime) {
let mut guard = self.inner.write().await;
guard.bandwidth.bytes += increase;
guard.bandwidth.expiration = new_expiration;
guard.bytes_delta_since_sync += increase;
}
pub(crate) async fn expire_bandwidth(&self) {
let mut guard = self.inner.write().await;
guard.bandwidth = AvailableBandwidth::default();
guard.last_synced = OffsetDateTime::now_utc();
guard.bytes_at_last_sync = 0;
guard.bytes_delta_since_sync = 0;
}
pub(crate) async fn resync_bandwidth_with_storage(&self, stored: i64) {
let mut guard = self.inner.write().await;
guard.bandwidth.bytes = stored;
guard.bytes_at_last_sync = stored;
guard.bytes_delta_since_sync = 0;
guard.last_synced = OffsetDateTime::now_utc();
}
}
@@ -73,6 +73,10 @@ where
self.shared_state.verification_key(epoch_id).await
}
pub fn storage(&self) -> &S {
&self.shared_state.storage
}
//Check for duplicate pay_info, then check the payment, then insert pay_info if everything succeeded
pub async fn check_payment(
&self,
+2 -2
View File
@@ -150,7 +150,7 @@ impl<S: Storage + Clone + 'static> CredentialVerifier<S> {
Ok(self
.bandwidth_storage_manager
.client_bandwidth
.bandwidth
.bytes)
.available()
.await)
}
}
@@ -0,0 +1,10 @@
/*
* Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
* SPDX-License-Identifier: Apache-2.0
*/
ALTER TABLE wireguard_peer
ADD COLUMN client_id INTEGER REFERENCES clients(id) DEFAULT NULL;
ALTER TABLE wireguard_peer
DROP COLUMN suspended;
+41 -12
View File
@@ -227,12 +227,14 @@ pub trait Storage: Send + Sync {
/// # Arguments
///
/// * `peer`: wireguard peer data to be stored
/// * `suspended`: if peer exists, but it's currently suspended
/// * `with_client_id`: if the peer should have a corresponding client_id
/// (created with entry wireguard ticket) or live without one (or with an
/// exiting one), for temporary backwards compatibility.
async fn insert_wireguard_peer(
&self,
peer: &defguard_wireguard_rs::host::Peer,
suspended: bool,
) -> Result<(), StorageError>;
with_client_id: bool,
) -> Result<Option<i64>, StorageError>;
/// Tries to retrieve available bandwidth for the particular peer.
///
@@ -334,14 +336,23 @@ impl Storage for PersistentStorage {
client_address: DestinationAddressBytes,
shared_keys: &SharedGatewayKey,
) -> Result<i64, StorageError> {
let client_id = self
.client_manager
.insert_client(ClientType::EntryMixnet)
.await?;
let client_address_bs58 = client_address.as_base58_string();
let client_id = match self
.shared_key_manager
.client_id(&client_address_bs58)
.await
{
Ok(client_id) => client_id,
_ => {
self.client_manager
.insert_client(ClientType::EntryMixnet)
.await?
}
};
self.shared_key_manager
.insert_shared_keys(
client_id,
client_address.as_base58_string(),
client_address_bs58,
shared_keys.aes128_ctr_hmac_bs58().as_deref(),
shared_keys.aes256_gcm_siv().as_deref(),
)
@@ -640,12 +651,30 @@ impl Storage for PersistentStorage {
async fn insert_wireguard_peer(
&self,
peer: &defguard_wireguard_rs::host::Peer,
suspended: bool,
) -> Result<(), StorageError> {
with_client_id: bool,
) -> Result<Option<i64>, StorageError> {
let client_id = match self
.wireguard_peer_manager
.retrieve_peer(&peer.public_key.to_string())
.await?
{
Some(peer) => peer.client_id,
_ => {
if with_client_id {
Some(
self.client_manager
.insert_client(ClientType::EntryWireguard)
.await?,
)
} else {
None
}
}
};
let mut peer = WireguardPeer::from(peer.clone());
peer.suspended = suspended;
peer.client_id = client_id;
self.wireguard_peer_manager.insert_peer(&peer).await?;
Ok(())
Ok(client_id)
}
async fn get_wireguard_peer(
+2 -2
View File
@@ -116,7 +116,7 @@ pub struct WireguardPeer {
pub rx_bytes: i64,
pub persistent_keepalive_interval: Option<i64>,
pub allowed_ips: Vec<u8>,
pub suspended: bool,
pub client_id: Option<i64>,
}
impl From<defguard_wireguard_rs::host::Peer> for WireguardPeer {
@@ -146,7 +146,7 @@ impl From<defguard_wireguard_rs::host::Peer> for WireguardPeer {
&value.allowed_ips,
)
.unwrap_or_default(),
suspended: false,
client_id: None,
}
}
}
@@ -27,16 +27,16 @@ impl WgPeerManager {
pub(crate) async fn insert_peer(&self, peer: &WireguardPeer) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT OR IGNORE INTO wireguard_peer(public_key, preshared_key, protocol_version, endpoint, last_handshake, tx_bytes, rx_bytes, persistent_keepalive_interval, allowed_ips, suspended)
INSERT OR IGNORE INTO wireguard_peer(public_key, preshared_key, protocol_version, endpoint, last_handshake, tx_bytes, rx_bytes, persistent_keepalive_interval, allowed_ips, client_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
UPDATE wireguard_peer
SET preshared_key = ?, protocol_version = ?, endpoint = ?, last_handshake = ?, tx_bytes = ?, rx_bytes = ?, persistent_keepalive_interval = ?, allowed_ips = ?, suspended = ?
SET preshared_key = ?, protocol_version = ?, endpoint = ?, last_handshake = ?, tx_bytes = ?, rx_bytes = ?, persistent_keepalive_interval = ?, allowed_ips = ?, client_id = ?
WHERE public_key = ?
"#,
peer.public_key, peer.preshared_key, peer.protocol_version, peer.endpoint, peer.last_handshake, peer.tx_bytes, peer.rx_bytes, peer.persistent_keepalive_interval, peer.allowed_ips, peer.suspended,
peer.preshared_key, peer.protocol_version, peer.endpoint, peer.last_handshake, peer.tx_bytes, peer.rx_bytes, peer.persistent_keepalive_interval, peer.allowed_ips, peer.suspended,peer.public_key,
peer.public_key, peer.preshared_key, peer.protocol_version, peer.endpoint, peer.last_handshake, peer.tx_bytes, peer.rx_bytes, peer.persistent_keepalive_interval, peer.allowed_ips, peer.client_id,
peer.preshared_key, peer.protocol_version, peer.endpoint, peer.last_handshake, peer.tx_bytes, peer.rx_bytes, peer.persistent_keepalive_interval, peer.allowed_ips, peer.client_id,
peer.public_key,
)
.execute(&self.connection_pool)
.await?;
@@ -78,7 +78,7 @@ impl WgPeerManager {
.await
}
/// Retrieve the wireguard peer with the provided public key from the storage.
/// Remove the wireguard peer with the provided public key from the storage.
///
/// # Arguments
///
+3 -1
View File
@@ -18,7 +18,9 @@ pub use user_agent::UserAgent;
mod user_agent;
pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
// The timeout is relatively high as we are often making requests over the mixnet, where latency is
// high and chatty protocols take a while to complete.
pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
pub type PathSegments<'a> = &'a [&'a str];
pub type Params<'a, K, V> = &'a [(K, V)];
+1 -1
View File
@@ -2,4 +2,4 @@ pub mod conversion;
pub mod request;
pub mod response;
const VERSION: u8 = 6;
pub const VERSION: u8 = 6;
+1 -1
View File
@@ -3,4 +3,4 @@ pub mod request;
pub mod response;
pub mod signature;
const VERSION: u8 = 7;
pub const VERSION: u8 = 7;
+1 -1
View File
@@ -32,7 +32,7 @@ pub const NYXD_URL: &str = "https://rpc.nymtech.net";
pub const NYM_API: &str = "https://validator.nymtech.net/api/";
pub const NYXD_WS: &str = "wss://rpc.nymtech.net/websocket";
pub const EXPLORER_API: &str = "https://explorer.nymtech.net/api/";
pub const NYM_VPN_API: &str = "https://nymvpn.net/api/";
pub const NYM_VPN_API: &str = "https://nymvpn.com/api/";
// I'm making clippy mad on purpose, because that url HAS TO be updated and deployed before merging
pub const EXIT_POLICY_URL: &str =
@@ -0,0 +1,14 @@
[package]
name = "nym-service-provider-requests-common"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
rust-version.workspace = true
readme.workspace = true
[dependencies]
serde = { workspace = true, features = ["derive"] }
@@ -0,0 +1,18 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
#[repr(u8)]
pub enum ServiceProviderType {
NetworkRequester = 0,
IpPacketRouter = 1,
Authenticator = 2,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Protocol {
pub version: u8,
pub service_provider_type: ServiceProviderType,
}
-18
View File
@@ -17,28 +17,10 @@ serde = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
nym-config = { path = "../config" }
nym-crypto = { path = "../crypto", features = ["asymmetric"] }
nym-network-defaults = { path = "../network-defaults" }
# feature-specific dependencies:
## verify:
hmac = { workspace = true, optional = true }
sha2 = { workspace = true, optional = true }
## openapi:
utoipa = { workspace = true, optional = true }
serde_json = { workspace = true, optional = true }
x25519-dalek = { workspace = true, features = ["static_secrets"] }
[dev-dependencies]
rand = { workspace = true }
nym-crypto = { path = "../crypto", features = ["rand"] }
[features]
default = ["verify"]
openapi = ["utoipa", "serde_json"]
# this is moved to a separate feature as we really need clients to import it (especially, *cough*, wasm)
verify = ["hmac", "sha2"]
-15
View File
@@ -5,13 +5,6 @@ use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("the provided base64-encoded client MAC ('{mac}') was malformed: {source}")]
MalformedClientMac {
mac: String,
#[source]
source: base64::DecodeError,
},
#[error("the provided base64-encoded client x25519 public key ('{pub_key}') was malformed: {source}")]
MalformedPeerPublicKeyEncoding {
pub_key: String,
@@ -24,12 +17,4 @@ pub enum Error {
pub_key: String,
decoded_length: usize,
},
#[cfg(feature = "verify")]
#[error("failed to verify mac provided by '{client}': {source}")]
FailedClientMacVerification {
client: String,
#[source]
source: hmac::digest::MacError,
},
}
-8
View File
@@ -4,19 +4,11 @@
pub mod config;
pub mod error;
pub mod public_key;
pub mod registration;
use std::time::Duration;
pub use config::Config;
pub use error::Error;
pub use public_key::PeerPublicKey;
pub use registration::{ClientMac, ClientMessage, GatewayClient, InitMessage, Nonce};
// To avoid any problems, keep this stale check time bigger (>2x) then the bandwidth cap
// reset time (currently that one is 24h, at UTC midnight)
pub const DEFAULT_PEER_TIMEOUT: Duration = Duration::from_secs(60 * 60 * 24 * 3); // 3 days
pub const DEFAULT_PEER_TIMEOUT_CHECK: Duration = Duration::from_secs(5); // 5 seconds
#[cfg(feature = "verify")]
pub use registration::HmacSha256;
+7 -3
View File
@@ -16,17 +16,21 @@ bincode = { workspace = true }
chrono = { workspace = true }
dashmap = { workspace = true }
defguard_wireguard_rs = { workspace = true }
futures = { workspace = true }
# The latest version on crates.io at the time of writing this (6.0.0) has a
# version mismatch with x25519-dalek/curve25519-dalek that is resolved in the
# latest commit. So pick that for now.
x25519-dalek = { workspace = true }
ip_network = { workspace = true }
log.workspace = true
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "net", "io-util"] }
tokio-stream = { workspace = true }
nym-authenticator-requests = { path = "../authenticator-requests" }
nym-credential-verification = { path = "../credential-verification" }
nym-crypto = { path = "../crypto", features = ["asymmetric"] }
nym-gateway-storage = { path = "../gateway-storage" }
nym-network-defaults = { path = "../network-defaults" }
nym-task = { path = "../task" }
nym-wireguard-types = { path = "../wireguard-types" }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "net", "io-util"] }
tokio-stream = { workspace = true }
+9
View File
@@ -6,9 +6,18 @@ pub enum Error {
#[error("peers in wireguard don't match with in-memory ")]
PeerMismatch,
#[error("traffic byte data needs to be increasing")]
InconsistentConsumedBytes,
#[error("{0}")]
Defguard(#[from] defguard_wireguard_rs::error::WireguardInterfaceError),
#[error("internal {0}")]
Internal(String),
#[error("storage should have the requested bandwidht entry")]
MissingClientBandwidthEntry,
#[error("{0}")]
GatewayStorage(#[from] nym_gateway_storage::error::StorageError),
}
+29 -27
View File
@@ -13,12 +13,13 @@ use nym_crypto::asymmetric::encryption::KeyPair;
use nym_wireguard_types::Config;
use peer_controller::PeerControlRequest;
use std::sync::Arc;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::sync::mpsc::{self, Receiver, Sender};
const WG_TUN_NAME: &str = "nymwg";
pub(crate) mod error;
pub mod peer_controller;
pub mod peer_handle;
pub struct WgApiWrapper {
inner: WGApi,
@@ -43,15 +44,12 @@ impl Drop for WgApiWrapper {
pub struct WireguardGatewayData {
config: Config,
keypair: Arc<KeyPair>,
peer_tx: UnboundedSender<PeerControlRequest>,
peer_tx: Sender<PeerControlRequest>,
}
impl WireguardGatewayData {
pub fn new(
config: Config,
keypair: Arc<KeyPair>,
) -> (Self, UnboundedReceiver<PeerControlRequest>) {
let (peer_tx, peer_rx) = mpsc::unbounded_channel();
pub fn new(config: Config, keypair: Arc<KeyPair>) -> (Self, Receiver<PeerControlRequest>) {
let (peer_tx, peer_rx) = mpsc::channel(1);
(
WireguardGatewayData {
config,
@@ -70,44 +68,45 @@ impl WireguardGatewayData {
&self.keypair
}
pub fn peer_tx(&self) -> &UnboundedSender<PeerControlRequest> {
pub fn peer_tx(&self) -> &Sender<PeerControlRequest> {
&self.peer_tx
}
}
pub struct WireguardData {
pub inner: WireguardGatewayData,
pub peer_rx: UnboundedReceiver<PeerControlRequest>,
pub peer_rx: Receiver<PeerControlRequest>,
}
/// Start wireguard device
#[cfg(target_os = "linux")]
pub async fn start_wireguard<St: nym_gateway_storage::Storage + 'static>(
pub async fn start_wireguard<St: nym_gateway_storage::Storage + Clone + 'static>(
storage: St,
all_peers: Vec<nym_gateway_storage::models::WireguardPeer>,
task_client: nym_task::TaskClient,
wireguard_data: WireguardData,
control_tx: UnboundedSender<peer_controller::PeerControlResponse>,
) -> Result<std::sync::Arc<WgApiWrapper>, Box<dyn std::error::Error + Send + Sync + 'static>> {
use base64::{prelude::BASE64_STANDARD, Engine};
use defguard_wireguard_rs::{InterfaceConfiguration, WireguardInterfaceApi};
use ip_network::IpNetwork;
use peer_controller::PeerController;
let mut peers = vec![];
let mut suspended_peers = vec![];
for storage_peer in all_peers {
let suspended = storage_peer.suspended;
let peer = Peer::try_from(storage_peer)?;
if suspended {
suspended_peers.push(peer);
} else {
peers.push(peer);
}
}
use std::collections::HashMap;
use tokio::sync::RwLock;
let ifname = String::from(WG_TUN_NAME);
let wg_api = defguard_wireguard_rs::WGApi::new(ifname.clone(), false)?;
let mut peer_bandwidth_managers = HashMap::with_capacity(all_peers.len());
let peers = all_peers
.into_iter()
.map(Peer::try_from)
.collect::<Result<Vec<_>, _>>()?;
for peer in peers.iter() {
let bandwidth_manager =
PeerController::generate_bandwidth_manager(storage.clone(), &peer.public_key)
.await?
.map(|bw_m| Arc::new(RwLock::new(bw_m)));
peer_bandwidth_managers.insert(peer.public_key.clone(), bandwidth_manager);
}
wg_api.create_interface()?;
let interface_config = InterfaceConfiguration {
name: ifname.clone(),
@@ -115,6 +114,7 @@ pub async fn start_wireguard<St: nym_gateway_storage::Storage + 'static>(
address: wireguard_data.inner.config().private_ip.to_string(),
port: wireguard_data.inner.config().announced_port as u32,
peers,
mtu: None,
};
wg_api.configure_interface(&interface_config)?;
@@ -130,16 +130,18 @@ pub async fn start_wireguard<St: nym_gateway_storage::Storage + 'static>(
)]);
wg_api.configure_peer_routing(&[catch_all_peer])?;
let host = wg_api.read_interface_data()?;
let wg_api = std::sync::Arc::new(WgApiWrapper::new(wg_api));
let mut controller = PeerController::new(
storage,
wg_api.clone(),
interface_config.peers,
suspended_peers,
host,
peer_bandwidth_managers,
wireguard_data.inner.peer_tx.clone(),
wireguard_data.peer_rx,
control_tx,
task_client,
);
tokio::spawn(async move { controller.run(task_client).await });
tokio::spawn(async move { controller.run().await });
Ok(wg_api)
}
+232 -199
View File
@@ -1,259 +1,292 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use chrono::{Timelike, Utc};
use defguard_wireguard_rs::{host::Peer, key::Key, WireguardInterfaceApi};
use defguard_wireguard_rs::{
host::{Host, Peer},
key::Key,
WireguardInterfaceApi,
};
use futures::channel::oneshot;
use nym_authenticator_requests::{
v1::registration::BANDWIDTH_CAP_PER_DAY, v2::registration::RemainingBandwidthData,
};
use nym_credential_verification::{
bandwidth_storage_manager::BandwidthStorageManager, BandwidthFlushingBehaviourConfig,
ClientBandwidth,
};
use nym_gateway_storage::Storage;
use nym_wireguard_types::registration::{RemainingBandwidthData, BANDWIDTH_CAP_PER_DAY};
use nym_wireguard_types::{DEFAULT_PEER_TIMEOUT, DEFAULT_PEER_TIMEOUT_CHECK};
use std::time::SystemTime;
use nym_wireguard_types::DEFAULT_PEER_TIMEOUT_CHECK;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::mpsc;
use tokio::sync::{mpsc, RwLock};
use tokio_stream::{wrappers::IntervalStream, StreamExt};
use crate::error::Error;
use crate::peer_handle::PeerHandle;
use crate::WgApiWrapper;
use crate::{error::Error, peer_handle::SharedBandwidthStorageManager};
pub enum PeerControlRequest {
AddPeer(Peer),
RemovePeer(Key),
QueryPeer(Key),
QueryBandwidth(Key),
}
pub enum PeerControlResponse {
AddPeer {
success: bool,
peer: Peer,
ticket_validation: bool,
response_tx: oneshot::Sender<AddPeerControlResponse>,
},
RemovePeer {
success: bool,
key: Key,
response_tx: oneshot::Sender<RemovePeerControlResponse>,
},
QueryPeer {
success: bool,
peer: Option<Peer>,
key: Key,
response_tx: oneshot::Sender<QueryPeerControlResponse>,
},
QueryBandwidth {
bandwidth_data: Option<RemainingBandwidthData>,
key: Key,
response_tx: oneshot::Sender<QueryBandwidthControlResponse>,
},
}
pub struct PeerController<St: Storage> {
storage: St,
request_rx: mpsc::UnboundedReceiver<PeerControlRequest>,
response_tx: mpsc::UnboundedSender<PeerControlResponse>,
wg_api: Arc<WgApiWrapper>,
timeout_check_interval: IntervalStream,
active_peers: HashMap<Key, Peer>,
suspended_peers: HashMap<Key, Peer>,
last_seen_bandwidth: HashMap<Key, u64>,
timeout_count: u8,
pub struct AddPeerControlResponse {
pub success: bool,
pub client_id: Option<i64>,
}
impl<St: Storage> PeerController<St> {
pub struct RemovePeerControlResponse {
pub success: bool,
}
pub struct QueryPeerControlResponse {
pub success: bool,
pub peer: Option<Peer>,
}
pub struct QueryBandwidthControlResponse {
pub success: bool,
pub bandwidth_data: Option<RemainingBandwidthData>,
}
pub struct PeerController<St: Storage + Clone + 'static> {
storage: St,
// used to receive commands from individual handles too
request_tx: mpsc::Sender<PeerControlRequest>,
request_rx: mpsc::Receiver<PeerControlRequest>,
wg_api: Arc<WgApiWrapper>,
host_information: Arc<RwLock<Host>>,
bw_storage_managers: HashMap<Key, Option<SharedBandwidthStorageManager<St>>>,
timeout_check_interval: IntervalStream,
task_client: nym_task::TaskClient,
}
impl<St: Storage + Clone + 'static> PeerController<St> {
pub fn new(
storage: St,
wg_api: Arc<WgApiWrapper>,
peers: Vec<Peer>,
suspended_peers: Vec<Peer>,
request_rx: mpsc::UnboundedReceiver<PeerControlRequest>,
response_tx: mpsc::UnboundedSender<PeerControlResponse>,
initial_host_information: Host,
bw_storage_managers: HashMap<Key, Option<SharedBandwidthStorageManager<St>>>,
request_tx: mpsc::Sender<PeerControlRequest>,
request_rx: mpsc::Receiver<PeerControlRequest>,
task_client: nym_task::TaskClient,
) -> Self {
let timeout_check_interval = tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval(DEFAULT_PEER_TIMEOUT_CHECK),
);
let active_peers: HashMap<Key, Peer> = peers
.into_iter()
.map(|peer| (peer.public_key.clone(), peer))
.collect();
let suspended_peers: HashMap<Key, Peer> = suspended_peers
.into_iter()
.map(|peer| (peer.public_key.clone(), peer))
.collect();
let last_seen_bandwidth = active_peers
.iter()
.map(|(k, p)| (k.clone(), p.rx_bytes + p.tx_bytes))
.chain(suspended_peers.keys().map(|k| (k.clone(), 0)))
.collect();
let host_information = Arc::new(RwLock::new(initial_host_information));
for (public_key, bandwidth_storage_manager) in bw_storage_managers.iter() {
let mut handle = PeerHandle::new(
storage.clone(),
public_key.clone(),
host_information.clone(),
bandwidth_storage_manager.clone(),
request_tx.clone(),
&task_client,
);
tokio::spawn(async move {
if let Err(e) = handle.run().await {
log::error!("Peer handle shut down ungracefully - {e}");
}
});
}
PeerController {
storage,
wg_api,
host_information,
bw_storage_managers,
request_tx,
request_rx,
response_tx,
timeout_check_interval,
active_peers,
suspended_peers,
last_seen_bandwidth,
timeout_count: 0,
task_client,
}
}
async fn check_stale_peer(
&self,
peer: &Peer,
current_timestamp: SystemTime,
) -> Result<bool, Error> {
if let Some(timestamp) = peer.last_handshake {
if let Ok(duration_since_handshake) = current_timestamp.duration_since(timestamp) {
if duration_since_handshake > DEFAULT_PEER_TIMEOUT {
self.storage
.remove_wireguard_peer(&peer.public_key.to_string())
.await?;
self.wg_api.inner.remove_peer(&peer.public_key)?;
return Ok(true);
}
}
}
Ok(false)
}
async fn check_suspend_peer(&mut self, peer: &Peer) -> Result<(), Error> {
let prev_peer = self
.active_peers
.get(&peer.public_key)
.ok_or(Error::PeerMismatch)?;
let data_usage =
(peer.rx_bytes + peer.tx_bytes).saturating_sub(prev_peer.rx_bytes + prev_peer.tx_bytes);
if data_usage > BANDWIDTH_CAP_PER_DAY {
self.storage.insert_wireguard_peer(peer, true).await?;
self.wg_api.inner.remove_peer(&peer.public_key)?;
self.active_peers
.remove_entry(&peer.public_key)
.ok_or(Error::PeerMismatch)?;
self.suspended_peers
.insert(peer.public_key.clone(), peer.clone());
} else {
// Update peer stored data
self.storage.insert_wireguard_peer(peer, false).await?;
}
Ok(())
}
async fn check_peers(&mut self) -> Result<(), Error> {
// Add 10 seconds to cover edge cases. At worst, we give ten free seconds worth of bandwidth
// by resetting the bandwidth twice
let reset = Utc::now().num_seconds_from_midnight() as u64
<= DEFAULT_PEER_TIMEOUT_CHECK.as_secs() + 10;
if reset {
for (_, peer) in self.suspended_peers.drain() {
self.wg_api.inner.configure_peer(&peer)?;
}
}
let host = self.wg_api.inner.read_interface_data()?;
self.last_seen_bandwidth = host
.peers
.iter()
.map(|(key, peer)| (key.clone(), peer.rx_bytes + peer.tx_bytes))
.collect();
// Do in-memory updates of bandwidth every DEFAULT_PEER_TIMEOUT_CHECK
// and storage updates every 5 * DEFAULT_PEER_TIMEOUT_CHECK, because in-memory
// is more important for client query preciseness
self.timeout_count = self.timeout_count % 5 + 1;
if !reset && self.timeout_count < 5 {
return Ok(());
}
if reset {
self.active_peers = host.peers;
for peer in self.active_peers.values() {
self.storage.insert_wireguard_peer(peer, false).await?;
}
} else {
let peers = self
// Function that should be used for peer insertion, to handle both storage and kernel interaction
pub async fn add_peer(&self, peer: &Peer, with_client_id: bool) -> Result<Option<i64>, Error> {
let client_id = self
.storage
.insert_wireguard_peer(peer, with_client_id)
.await?;
let ret = self.wg_api.inner.configure_peer(peer);
if ret.is_err() {
// Try to revert the insertion in storage
if self
.storage
.get_all_wireguard_peers()
.await?
.into_iter()
.map(Peer::try_from)
.collect::<Result<Vec<_>, _>>()?;
let current_timestamp = SystemTime::now();
for peer in peers {
if !self.check_stale_peer(&peer, current_timestamp).await? {
self.check_suspend_peer(&peer).await?;
}
.remove_wireguard_peer(&peer.public_key.to_string())
.await
.is_err()
{
log::error!("The storage has been corrupted. Wireguard peer {} will persist in storage indefinitely.", peer.public_key);
}
}
Ok(())
ret?;
Ok(client_id)
}
pub async fn run(&mut self, mut task_client: nym_task::TaskClient) {
// Function that should be used for peer removal, to handle both storage and kernel interaction
pub async fn remove_peer(&mut self, key: &Key) -> Result<(), Error> {
self.storage.remove_wireguard_peer(&key.to_string()).await?;
self.bw_storage_managers.remove(key);
let ret = self.wg_api.inner.remove_peer(key);
if ret.is_err() {
log::error!("Wireguard peer could not be removed from wireguard kernel module. Process should be restarted so that the interface is reset.");
}
Ok(ret?)
}
pub async fn generate_bandwidth_manager(
storage: St,
public_key: &Key,
) -> Result<Option<BandwidthStorageManager<St>>, Error> {
if let Some(client_id) = storage
.get_wireguard_peer(&public_key.to_string())
.await?
.ok_or(Error::MissingClientBandwidthEntry)?
.client_id
{
let bandwidth = storage
.get_available_bandwidth(client_id)
.await?
.ok_or(Error::MissingClientBandwidthEntry)?;
Ok(Some(BandwidthStorageManager::new(
storage,
ClientBandwidth::new(bandwidth.into()),
client_id,
BandwidthFlushingBehaviourConfig::default(),
true,
)))
} else {
Ok(None)
}
}
async fn handle_add_request(
&mut self,
peer: &Peer,
with_client_id: bool,
) -> Result<Option<i64>, Error> {
let client_id = self.add_peer(peer, with_client_id).await?;
let bandwidth_storage_manager =
Self::generate_bandwidth_manager(self.storage.clone(), &peer.public_key)
.await?
.map(|bw_m| Arc::new(RwLock::new(bw_m)));
let mut handle = PeerHandle::new(
self.storage.clone(),
peer.public_key.clone(),
self.host_information.clone(),
bandwidth_storage_manager.clone(),
self.request_tx.clone(),
&self.task_client,
);
self.bw_storage_managers
.insert(peer.public_key.clone(), bandwidth_storage_manager);
tokio::spawn(async move {
if let Err(e) = handle.run().await {
log::error!("Peer handle shut down ungracefully - {e}");
}
});
Ok(client_id)
}
async fn handle_query_peer(&self, key: &Key) -> Result<Option<Peer>, Error> {
Ok(self
.storage
.get_wireguard_peer(&key.to_string())
.await?
.map(Peer::try_from)
.transpose()?)
}
async fn handle_query_bandwidth(
&self,
key: &Key,
) -> Result<Option<RemainingBandwidthData>, Error> {
let Some(bandwidth_storage_manager) = self.bw_storage_managers.get(key) else {
return Ok(None);
};
let available_bandwidth = if let Some(bandwidth_storage_manager) = bandwidth_storage_manager
{
bandwidth_storage_manager
.read()
.await
.available_bandwidth()
.await
} else {
let peer = self
.host_information
.read()
.await
.peers
.get(key)
.ok_or(Error::PeerMismatch)?
.clone();
BANDWIDTH_CAP_PER_DAY.saturating_sub((peer.rx_bytes + peer.tx_bytes) as i64)
};
Ok(Some(RemainingBandwidthData {
available_bandwidth,
}))
}
pub async fn run(&mut self) {
loop {
tokio::select! {
_ = self.timeout_check_interval.next() => {
if let Err(e) = self.check_peers().await {
log::error!("Error while periodically checking peers: {:?}", e);
}
let Ok(host) = self.wg_api.inner.read_interface_data() else {
log::error!("Can't read wireguard kernel data");
continue;
};
*self.host_information.write().await = host;
}
_ = task_client.recv() => {
_ = self.task_client.recv() => {
log::trace!("PeerController handler: Received shutdown");
break;
}
msg = self.request_rx.recv() => {
match msg {
Some(PeerControlRequest::AddPeer(peer)) => {
if let Err(e) = self.storage.insert_wireguard_peer(&peer, false).await {
log::error!("Could not insert peer into storage: {:?}", e);
self.response_tx.send(PeerControlResponse::AddPeer { success: false }).ok();
continue;
Some(PeerControlRequest::AddPeer { peer, ticket_validation, response_tx }) => {
let ret = self.handle_add_request(&peer, ticket_validation).await;
if let Ok(client_id) = ret {
response_tx.send(AddPeerControlResponse { success: true, client_id }).ok();
} else {
response_tx.send(AddPeerControlResponse { success: false, client_id: None }).ok();
}
let success = if let Err(e) = self.wg_api.inner.configure_peer(&peer) {
log::error!("Could not configure peer: {:?}", e);
false
} else {
self.last_seen_bandwidth.insert(peer.public_key.clone(), peer.rx_bytes + peer.tx_bytes);
self.active_peers.insert(peer.public_key.clone(), peer);
true
};
self.response_tx.send(PeerControlResponse::AddPeer { success }).ok();
}
Some(PeerControlRequest::RemovePeer(peer_pubkey)) => {
if let Err(e) = self.storage.remove_wireguard_peer(&peer_pubkey.to_string()).await {
log::error!("Could not remove peer from storage: {:?}", e);
self.response_tx.send(PeerControlResponse::RemovePeer { success: false }).ok();
continue;
Some(PeerControlRequest::RemovePeer { key, response_tx }) => {
let success = self.remove_peer(&key).await.is_ok();
response_tx.send(RemovePeerControlResponse { success }).ok();
}
Some(PeerControlRequest::QueryPeer { key, response_tx }) => {
let ret = self.handle_query_peer(&key).await;
if let Ok(peer) = ret {
response_tx.send(QueryPeerControlResponse { success: true, peer }).ok();
} else {
response_tx.send(QueryPeerControlResponse { success: false, peer: None }).ok();
}
let success = if let Err(e) = self.wg_api.inner.remove_peer(&peer_pubkey) {
log::error!("Could not remove peer: {:?}", e);
false
} else {
self.active_peers.remove(&peer_pubkey);
self.suspended_peers.remove(&peer_pubkey);
true
};
self.response_tx.send(PeerControlResponse::RemovePeer { success }).ok();
}
Some(PeerControlRequest::QueryPeer(peer_pubkey)) => {
let (success, peer) = match self.storage.get_wireguard_peer(&peer_pubkey.to_string()).await {
Err(e) => {
log::error!("Could not query peer storage {e}");
(false, None)
},
Ok(None) => (true, None),
Ok(Some(storage_peer)) => {
match Peer::try_from(storage_peer) {
Ok(peer) => (true, Some(peer)),
Err(e) => {
log::error!("Could not parse storage peer {e}");
(false, None)
}
}
},
};
self.response_tx.send(PeerControlResponse::QueryPeer { success, peer }).ok();
}
Some(PeerControlRequest::QueryBandwidth(peer_pubkey)) => {
let msg = if self.suspended_peers.contains_key(&peer_pubkey) {
PeerControlResponse::QueryBandwidth { bandwidth_data: Some(RemainingBandwidthData{ available_bandwidth: 0, suspended: true }) }
} else if let Some(&consumed_bandwidth) = self.last_seen_bandwidth.get(&peer_pubkey) {
PeerControlResponse::QueryBandwidth { bandwidth_data: Some(RemainingBandwidthData{ available_bandwidth: BANDWIDTH_CAP_PER_DAY - consumed_bandwidth, suspended: false })}
Some(PeerControlRequest::QueryBandwidth { key, response_tx }) => {
let ret = self.handle_query_bandwidth(&key).await;
if let Ok(bandwidth_data) = ret {
response_tx.send(QueryBandwidthControlResponse { success: true, bandwidth_data }).ok();
} else {
PeerControlResponse::QueryBandwidth { bandwidth_data: None }
};
self.response_tx.send(msg).ok();
response_tx.send(QueryBandwidthControlResponse { success: false, bandwidth_data: None }).ok();
}
}
None => {
log::trace!("PeerController [main loop]: stopping since channel closed");
+132
View File
@@ -0,0 +1,132 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::Error;
use crate::peer_controller::PeerControlRequest;
use defguard_wireguard_rs::{host::Host, key::Key};
use futures::channel::oneshot;
use nym_authenticator_requests::v2::registration::BANDWIDTH_CAP_PER_DAY;
use nym_credential_verification::bandwidth_storage_manager::BandwidthStorageManager;
use nym_gateway_storage::models::WireguardPeer;
use nym_gateway_storage::Storage;
use nym_task::TaskClient;
use nym_wireguard_types::DEFAULT_PEER_TIMEOUT_CHECK;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use tokio_stream::{wrappers::IntervalStream, StreamExt};
pub(crate) type SharedBandwidthStorageManager<St> = Arc<RwLock<BandwidthStorageManager<St>>>;
pub struct PeerHandle<St> {
storage: St,
public_key: Key,
host_information: Arc<RwLock<Host>>,
bandwidth_storage_manager: Option<SharedBandwidthStorageManager<St>>,
request_tx: mpsc::Sender<PeerControlRequest>,
timeout_check_interval: IntervalStream,
task_client: TaskClient,
}
impl<St: Storage + Clone + 'static> PeerHandle<St> {
pub fn new(
storage: St,
public_key: Key,
host_information: Arc<RwLock<Host>>,
bandwidth_storage_manager: Option<SharedBandwidthStorageManager<St>>,
request_tx: mpsc::Sender<PeerControlRequest>,
task_client: &TaskClient,
) -> Self {
let timeout_check_interval = tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval(DEFAULT_PEER_TIMEOUT_CHECK),
);
let mut task_client = task_client.fork(format!("peer-{public_key}"));
task_client.disarm();
PeerHandle {
storage,
public_key,
host_information,
bandwidth_storage_manager,
request_tx,
timeout_check_interval,
task_client,
}
}
async fn remove_depleted_peer(&self) -> Result<bool, Error> {
log::debug!(
"Peer {} doesn't have bandwidth anymore, removing it",
self.public_key.to_string()
);
let (response_tx, response_rx) = oneshot::channel();
self.request_tx
.send(PeerControlRequest::RemovePeer {
key: self.public_key.clone(),
response_tx,
})
.await
.map_err(|_| Error::Internal("peer controller shut down".to_string()))?;
let success = response_rx
.await
.map_err(|_| Error::Internal("peer controller didn't respond".to_string()))?
.success;
Ok(success)
}
async fn active_peer(&mut self, storage_peer: WireguardPeer) -> Result<bool, Error> {
let kernel_peer = self
.host_information
.read()
.await
.peers
.get(&self.public_key)
.ok_or(Error::PeerMismatch)?
.clone();
if let Some(bandwidth_manager) = &self.bandwidth_storage_manager {
let spent_bandwidth = (kernel_peer.rx_bytes + kernel_peer.tx_bytes)
.checked_sub(storage_peer.rx_bytes as u64 + storage_peer.tx_bytes as u64)
.ok_or(Error::InconsistentConsumedBytes)?
.try_into()
.map_err(|_| Error::InconsistentConsumedBytes)?;
if bandwidth_manager
.write()
.await
.try_use_bandwidth(spent_bandwidth)
.await
.is_err()
{
let success = self.remove_depleted_peer().await?;
return Ok(!success);
}
} else {
let spent_bandwidth = kernel_peer.rx_bytes + kernel_peer.tx_bytes;
if spent_bandwidth >= BANDWIDTH_CAP_PER_DAY {
let success = self.remove_depleted_peer().await?;
return Ok(!success);
}
}
Ok(true)
}
pub async fn run(&mut self) -> Result<(), Error> {
while !self.task_client.is_shutdown() {
tokio::select! {
_ = self.timeout_check_interval.next() => {
let Some(peer) = self.storage.get_wireguard_peer(&self.public_key.to_string()).await? else {
log::debug!("Peer {:?} not in storage anymore, shutting down handle", self.public_key);
return Ok(());
};
if !self.active_peer(peer).await? {
log::debug!("Peer {:?} doesn't have bandwidth anymore, shutting down handle", self.public_key);
return Ok(());
}
}
_ = self.task_client.recv() => {
log::trace!("PeerHandle: Received shutdown");
}
}
}
Ok(())
}
}
+1 -3
View File
@@ -538,15 +538,13 @@ pub fn query(
#[entry_point]
pub fn migrate(
mut deps: DepsMut<'_>,
deps: DepsMut<'_>,
_env: Env,
msg: MigrateMsg,
) -> Result<Response, MixnetContractError> {
set_build_information!(deps.storage)?;
cw2::ensure_from_older_version(deps.storage, CONTRACT_NAME, CONTRACT_VERSION)?;
crate::queued_migrations::explicit_contract_admin(deps.branch())?;
// due to circular dependency on contract addresses (i.e. mixnet contract requiring vesting contract address
// and vesting contract requiring the mixnet contract address), if we ever want to deploy any new fresh
// environment, one of the contracts will HAVE TO go through a migration
+1 -19
View File
@@ -1,20 +1,2 @@
// Copyright 2022-2023 - Nym Technologies SA <contact@nymtech.net>
// Copyright 2022-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::mixnet_contract_settings::storage as mixnet_params_storage;
use cosmwasm_std::DepsMut;
use mixnet_contract_common::error::MixnetContractError;
pub(crate) fn explicit_contract_admin(deps: DepsMut) -> Result<(), MixnetContractError> {
// we need to read the deprecated field to migrate it over
#[allow(deprecated)]
// SAFETY: this value should ALWAYS exist on the first execution of this migration;
// as a matter of fact, it should ALWAYS continue existing until another migration
#[allow(clippy::expect_used)]
let existing_admin = mixnet_params_storage::CONTRACT_STATE
.load(deps.storage)?
.owner
.expect("the contract state is corrupt - there's no admin set");
mixnet_params_storage::ADMIN.set(deps, Some(existing_admin))?;
Ok(())
}
+4 -7
View File
@@ -11,16 +11,13 @@ STAKE_DENOM_DISPLAY=nyx
DENOMS_EXPONENT=6
MIXNET_CONTRACT_ADDRESS=n1hm4y6fzgxgu688jgf7ek66px6xkrtmn3gyk8fax3eawhp68c2d5qujz296
ECASH_CONTRACT_ADDRESS=n14y2x8a60knc5jjfeztt84kw8x8l5pwdgnqg256v0p9v4p7t2q6eswxyusw
GROUP_CONTRACT_ADDRESS=n1qp35fcj0v9u3trhaps5v9q0lc42t4m6aty2wryss75ee8zuqnsqqdcreyq
MULTISIG_CONTRACT_ADDRESS=n1qa4hswlcjmttulj0q9qa46jf64f93pecl6tydcsjldfe0hy5ju0sdmwzya
COCONUT_DKG_CONTRACT_ADDRESS=n1ayrk6wp6w5lf6njtnfjwljmtcc9vevv5sxwkz7uq24rp2pw67t0qhmmxdd
ECASH_CONTRACT_ADDRESS=n13xspq62y9gq6nueqmywxcdv2yep4p6nzv98w2889k25v3nhdy2dq2rkrk7
GROUP_CONTRACT_ADDRESS=n13l7rwuwktklrwskc7m6lv70zws07en85uma28j7dxwsz9y5hvvhspl7a2t
MULTISIG_CONTRACT_ADDRESS=n138c9pyf7f3hyx0j3t6vmsz7ultnw2wj0lu6hzndep9z5grgq9haqlc25k0
COCONUT_DKG_CONTRACT_ADDRESS=n1pk8jgr6y4c5k93gz7qf3xc0hvygmp7csk88c2tf8l39tkq6834wq2a6dtr
VESTING_CONTRACT_ADDRESS=n1jlzdxnyces4hrhqz68dqk28mrw5jgwtcfq0c2funcwrmw0dx9l9s8nnnvj
REWARDING_VALIDATOR_ADDRESS=n1rfvpsynktze6wvn6ldskj8xgwfzzk5v6pnff39
EXPLORER_API=https://qa-network-explorer.qa.nymte.ch/api
NYXD="https://qa-validator.qa.nymte.ch"
NYM_API="https://qa-nym-api.qa.nymte.ch/api"
DKG_TIME_CONFIGURATION="600,300,300,60,60,1209600"
EXIT_POLICY="https://nymtech.net/.wellknown/network-requester/exit-policy.txt"
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "explorer-api"
version = "1.1.39"
version = "1.1.40"
edition = "2021"
license.workspace = true
@@ -27,7 +27,7 @@ use nym_gateway_storage::{error::StorageError, Storage};
use nym_sphinx::forwarding::packet::MixPacket;
use nym_task::TaskClient;
use nym_validator_client::coconut::EcashApiError;
use rand::{CryptoRng, Rng};
use rand::{random, CryptoRng, Rng};
use std::{process, time::Duration};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
@@ -236,11 +236,7 @@ where
enc_credential: Vec<u8>,
iv: Vec<u8>,
) -> Result<ServerResponse, RequestHandlingError> {
// TODO: change it into a span field instead once we move to tracing
debug!(
"handling e-cash bandwidth request from {}",
self.client.address
);
debug!("handling e-cash bandwidth request");
let credential = ClientControlRequest::try_from_enc_ecash_credential(
enc_credential,
@@ -253,7 +249,11 @@ where
self.bandwidth_storage_manager.clone(),
);
let available_total = verifier.verify().await?;
let available_total = verifier
.verify()
.await
.inspect_err(|verification_failure| debug!("{verification_failure}"))?;
trace!("available total bandwidth: {available_total}");
Ok(ServerResponse::Bandwidth { available_total })
}
@@ -340,20 +340,17 @@ where
&mut self,
ciphertext: Vec<u8>,
nonce: Vec<u8>,
) -> Message {
) -> Result<ServerResponse, RequestHandlingError> {
let Ok(req) = ClientRequest::decrypt(&ciphertext, &nonce, &self.client.shared_keys) else {
return RequestHandlingError::InvalidEncryptedTextRequest.into_error_message();
return Err(RequestHandlingError::InvalidEncryptedTextRequest);
};
match req {
ClientRequest::UpgradeKey {
hkdf_salt,
derived_key_digest,
} => self
.handle_key_upgrade(hkdf_salt, derived_key_digest)
.await
.into_ws_message(),
_ => RequestHandlingError::UnknownEncryptedTextRequest.into_error_message(),
} => self.handle_key_upgrade(hkdf_salt, derived_key_digest).await,
_ => Err(RequestHandlingError::UnknownEncryptedTextRequest),
}
}
@@ -366,59 +363,64 @@ where
/// * `raw_request`: raw message to handle.
async fn handle_text(&mut self, raw_request: String) -> Message {
trace!("text request");
match ClientControlRequest::try_from(raw_request) {
Err(e) => RequestHandlingError::InvalidTextRequest(e).into_error_message(),
Ok(request) => match request {
ClientControlRequest::EncryptedRequest { ciphertext, nonce } => {
self.handle_encrypted_text_request(ciphertext, nonce).await
}
ClientControlRequest::EcashCredential { enc_credential, iv } => self
.handle_ecash_bandwidth(enc_credential, iv)
.await
.into_ws_message(),
ClientControlRequest::BandwidthCredential { .. } => {
RequestHandlingError::IllegalRequest {
additional_context: "coconut credential are not longer supported".into(),
}
.into_error_message()
}
ClientControlRequest::BandwidthCredentialV2 { .. } => {
RequestHandlingError::IllegalRequest {
additional_context: "coconut credential are not longer supported".into(),
}
.into_error_message()
}
ClientControlRequest::ClaimFreeTestnetBandwidth => self
.bandwidth_storage_manager
.handle_claim_testnet_bandwidth()
.await
.map_err(|e| e.into())
.into_ws_message(),
ClientControlRequest::SupportedProtocol { .. } => self
.inner
.handle_supported_protocol_request()
.into_ws_message(),
other @ ClientControlRequest::Authenticate { .. } => {
RequestHandlingError::IllegalRequest {
additional_context: format!(
"received illegal message of type {} in an authenticated client",
other.name()
),
}
.into_error_message()
}
other @ ClientControlRequest::RegisterHandshakeInitRequest { .. } => {
RequestHandlingError::IllegalRequest {
additional_context: format!(
"received illegal message of type {} in an authenticated client",
other.name()
),
}
.into_error_message()
}
_ => RequestHandlingError::UnknownTextRequest.into_error_message(),
},
let request = match ClientControlRequest::try_from(raw_request) {
Ok(req) => {
debug!("received request of type {}", req.name());
req
}
Err(err) => {
debug!("request was malformed: {err}");
return RequestHandlingError::InvalidTextRequest(err).into_error_message();
}
};
match request {
ClientControlRequest::EncryptedRequest { ciphertext, nonce } => {
self.handle_encrypted_text_request(ciphertext, nonce).await
}
ClientControlRequest::EcashCredential { enc_credential, iv } => {
self.handle_ecash_bandwidth(enc_credential, iv).await
}
ClientControlRequest::BandwidthCredential { .. } => {
Err(RequestHandlingError::IllegalRequest {
additional_context: "coconut credential are not longer supported".into(),
})
}
ClientControlRequest::BandwidthCredentialV2 { .. } => {
Err(RequestHandlingError::IllegalRequest {
additional_context: "coconut credential are not longer supported".into(),
})
}
ClientControlRequest::ClaimFreeTestnetBandwidth => self
.bandwidth_storage_manager
.handle_claim_testnet_bandwidth()
.await
.map_err(|e| e.into()),
ClientControlRequest::SupportedProtocol { .. } => {
Ok(self.inner.handle_supported_protocol_request())
}
other @ ClientControlRequest::Authenticate { .. } => {
Err(RequestHandlingError::IllegalRequest {
additional_context: format!(
"received illegal message of type {} in an authenticated client",
other.name()
),
})
}
other @ ClientControlRequest::RegisterHandshakeInitRequest { .. } => {
Err(RequestHandlingError::IllegalRequest {
additional_context: format!(
"received illegal message of type {} in an authenticated client",
other.name()
),
})
}
_ => Err(RequestHandlingError::UnknownTextRequest),
}
.inspect(|res| debug!(response = ?res, "success"))
.inspect_err(|err| debug!(error = %err, "failure"))
.into_ws_message()
}
/// Handles pong message received from the client.
@@ -452,12 +454,13 @@ where
/// # Arguments
///
/// * `raw_request`: raw received websocket message.
#[instrument(level = "debug", skip_all,
fields(
client = %self.client.address.as_base58_string()
)
)]
async fn handle_request(&mut self, raw_request: Message) -> Option<Message> {
// TODO: this should be added via tracing
debug!(
"handling request from {}",
self.client.address.as_base58_string()
);
trace!("new request");
// apparently tungstenite auto-handles ping/pong/close messages so for now let's ignore
// them and let's test that claim. If that's not the case, just copy code from
@@ -478,8 +481,8 @@ where
where
S: AsyncRead + AsyncWrite + Unpin,
{
let tag: u64 = rand::thread_rng().gen();
debug!("Got request to ping our connection: {}", tag);
let tag: u64 = random();
debug!("got request to ping our connection: {tag}");
self.inner
.send_websocket_message(Message::Ping(tag.to_be_bytes().to_vec()))
.await?;
@@ -73,6 +73,9 @@ pub(crate) enum InitialAuthenticationError {
#[error("Only 'Register' or 'Authenticate' requests are allowed")]
InvalidRequest,
#[error("received a Message of type {typ} which was not expected in this context")]
UnexpectedMessageType { typ: String },
#[error("Experienced connection error: {0}")]
ConnectionError(#[from] WsError),
@@ -420,7 +423,7 @@ where
// we can't handle clients with higher protocol than ours
// (perhaps we could try to negotiate downgrade on our end? sounds like a nice future improvement)
if client_protocol_version <= CURRENT_PROTOCOL_VERSION {
info!("the client is using exactly the same (or older) protocol version as we are. We're good to continue!");
debug!("the client is using exactly the same (or older) protocol version as we are. We're good to continue!");
Ok(CURRENT_PROTOCOL_VERSION)
} else {
let err = InitialAuthenticationError::IncompatibleProtocol {
@@ -861,9 +864,27 @@ where
Message::Binary(_) => {
return Err(InitialAuthenticationError::BinaryRequestWithoutAuthentication);
}
_ => unreachable!(
"the underlying tunsgenite stream should be handling other message types"
),
other => {
if other.is_ping() {
debug!("unexpected ping message!");
return Err(InitialAuthenticationError::UnexpectedMessageType {
typ: "ping".to_string(),
});
} else if other.is_pong() {
debug!("unexpected pong message!");
return Err(InitialAuthenticationError::UnexpectedMessageType {
typ: "pong".to_string(),
});
} else if other.is_close() {
debug!("unexpected close message!");
return Err(InitialAuthenticationError::UnexpectedMessageType {
typ: "close".to_string(),
});
}
// at this point this is definitely unreachable, but just in case, let's not panic...
return Err(InitialAuthenticationError::InvalidRequest);
}
};
text.parse()
+19 -13
View File
@@ -246,6 +246,7 @@ impl<St> Gateway<St> {
&mut self,
forwarding_channel: MixForwardingSender,
shutdown: TaskClient,
ecash_verifier: Arc<EcashManager<St>>,
) -> Result<StartedAuthenticator, Box<dyn std::error::Error + Send + Sync>>
where
St: Storage + Clone + 'static,
@@ -256,7 +257,6 @@ impl<St> Gateway<St> {
.ok_or(GatewayError::UnspecifiedAuthenticatorConfig)?;
let (router_tx, mut router_rx) = oneshot::channel();
let (auth_mix_sender, auth_mix_receiver) = mpsc::unbounded();
let (peer_response_tx, peer_response_rx) = tokio::sync::mpsc::unbounded_channel();
let router_shutdown = shutdown.fork("message_router");
let transceiver = LocalGateway::new(
*self.identity_keypair.public_key(),
@@ -286,8 +286,8 @@ impl<St> Gateway<St> {
opts.config.clone(),
wireguard_data.inner.clone(),
used_private_network_ips,
peer_response_rx,
)
.with_ecash_verifier(ecash_verifier)
.with_custom_gateway_transceiver(Box::new(transceiver))
.with_shutdown(shutdown.fork("authenticator"))
.with_wait_for_gateway(true)
@@ -322,7 +322,6 @@ impl<St> Gateway<St> {
all_peers,
shutdown,
wireguard_data,
peer_response_tx,
)
.await?;
@@ -342,6 +341,7 @@ impl<St> Gateway<St> {
&self,
_forwarding_channel: MixForwardingSender,
_shutdown: TaskClient,
_ecash_verifier: Arc<EcashManager<St>>,
) -> Result<StartedAuthenticator, Box<dyn std::error::Error + Send + Sync>> {
todo!("Authenticator is currently only supported on Linux");
}
@@ -616,14 +616,16 @@ impl<St> Gateway<St> {
.maximum_time_between_redemption,
};
let ecash_manager = EcashManager::new(
handler_config,
nyxd_client,
self.identity_keypair.public_key().to_bytes(),
shutdown.fork("EcashVerifier"),
self.storage.clone(),
)
.await?;
let ecash_verifier = Arc::new(
EcashManager::new(
handler_config,
nyxd_client,
self.identity_keypair.public_key().to_bytes(),
shutdown.fork("EcashVerifier"),
self.storage.clone(),
)
.await?,
);
let mix_forwarding_channel = self.start_packet_forwarder(shutdown.fork("PacketForwarder"));
@@ -638,7 +640,7 @@ impl<St> Gateway<St> {
mix_forwarding_channel.clone(),
active_clients_store.clone(),
shutdown.fork("websocket::Listener"),
Arc::new(ecash_manager),
ecash_verifier.clone(),
);
let nr_request_filter = if self.config.network_requester.enabled {
@@ -670,7 +672,11 @@ impl<St> Gateway<St> {
let _wg_api = if self.wireguard_data.is_some() {
let embedded_auth = self
.start_authenticator(mix_forwarding_channel, shutdown.fork("authenticator"))
.start_authenticator(
mix_forwarding_channel,
shutdown.fork("authenticator"),
ecash_verifier,
)
.await
.map_err(|source| GatewayError::AuthenticatorStartError { source })?;
active_clients_store.insert_embedded(embedded_auth.handle);
+8 -3
View File
@@ -4,7 +4,7 @@
[package]
name = "nym-api"
license = "GPL-3.0"
version = "1.1.43"
version = "1.1.44"
authors = [
"Dave Hrycyszyn <futurechimp@users.noreply.github.com>",
"Jędrzej Stuczyński <andrew@nymtech.net>",
@@ -76,7 +76,7 @@ axum = { workspace = true, features = ["tokio"], optional = true }
axum-extra = { workspace = true, features = ["typed-header"], optional = true }
tower-http = { workspace = true, features = ["cors", "trace"], optional = true }
utoipa = { workspace = true, features = ["axum_extras", "time"], optional = true }
utoipa-swagger-ui = { workspace = true, features = ["axum"], optional = true}
utoipa-swagger-ui = { workspace = true, features = ["axum"], optional = true }
utoipauto = { workspace = true, optional = true }
tracing-subscriber = { workspace = true, features = ["env-filter"], optional = true }
tracing = { workspace = true, optional = true }
@@ -112,7 +112,7 @@ cw4 = { workspace = true }
nym-dkg = { path = "../common/dkg", features = ["cw-types"] }
nym-gateway-client = { path = "../common/client-libs/gateway-client" }
nym-inclusion-probability = { path = "../common/inclusion-probability" }
nym-mixnet-contract-common = { path = "../common/cosmwasm-smart-contracts/mixnet-contract", features = ["utoipa"]}
nym-mixnet-contract-common = { path = "../common/cosmwasm-smart-contracts/mixnet-contract", features = ["utoipa"] }
nym-vesting-contract-common = { path = "../common/cosmwasm-smart-contracts/vesting-contract" }
nym-contracts-common = { path = "../common/cosmwasm-smart-contracts/contracts-common" }
nym-multisig-contract-common = { path = "../common/cosmwasm-smart-contracts/multisig-contract" }
@@ -129,6 +129,10 @@ nym-node-requests = { path = "../nym-node/nym-node-requests" }
nym-types = { path = "../common/types" }
nym-http-api-common = { path = "../common/http-api-common", features = ["utoipa"] }
tikv-jemallocator = { version = "0.6", optional = true, features = ["profiling"] }
tikv-jemalloc-sys = { version = "0.6", optional = true, features = ["stats", "profiling", "unprefixed_malloc_on_supported_platforms"] }
tikv-jemalloc-ctl = { version = "0.6", optional = true, features = ["use_std", "stats", "profiling"] }
[features]
no-reward = []
generate-ts = ["ts-rs"]
@@ -143,6 +147,7 @@ axum = ["dep:axum",
"nym-http-api-common/utoipa",
"nym-mixnet-contract-common/utoipa"
]
memory-prof = ["tikv-jemallocator", "tikv-jemalloc-ctl", "tikv-jemalloc-sys"]
[build-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
+13 -2
View File
@@ -24,7 +24,7 @@ use circulating_supply_api::cache::CirculatingSupplyCache;
use clap::Parser;
use ecash::dkg::controller::DkgController;
use node_status_api::NodeStatusCache;
use nym_bin_common::logging::setup_logging;
use nym_bin_common::logging::{setup_logging, setup_tracing_logger};
use nym_config::defaults::NymNetworkDetails;
use nym_contract_cache::cache::NymContractCache;
use nym_sphinx::receiver::SphinxMessageReceiver;
@@ -44,6 +44,10 @@ pub(crate) mod nym_nodes;
mod status;
pub(crate) mod support;
#[cfg(feature = "memory-prof")]
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[cfg(feature = "axum")]
mod v2;
@@ -58,9 +62,16 @@ async fn main() -> Result<(), anyhow::Error> {
cfg_if::cfg_if! {if #[cfg(feature = "console-subscriber")] {
// instrument tokio console subscriber needs RUSTFLAGS="--cfg tokio_unstable" at build time
console_subscriber::init();
} else {
setup_tracing_logger();
}}
setup_logging();
// setup_tracing_logger();
// std::env::set_var("MALLOC_CONF", "prof:true,lg_prof_interval:28");
// setup_tracing_logger();
// TODO rocket: replace with tracing logger once rocket is eliminated from code
info!("Starting nym api...");
+131
View File
@@ -0,0 +1,131 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::node_status_api::models::RocketErrorResponse;
use okapi::openapi3::{OpenApi, Responses};
use rocket::http::Status;
use rocket::response::Responder;
use rocket::{response, Request, Route};
use rocket_okapi::gen::OpenApiGenerator;
use rocket_okapi::response::OpenApiResponderInner;
use rocket_okapi::settings::OpenApiSettings;
use rocket_okapi::util::ensure_status_code_exists;
use rocket_okapi::{openapi, openapi_get_routes_spec};
// code taken from https://github.dev/GreptimeTeam/greptimedb/blob/develop/src/cmd/src/bin/greptime.rs
#[cfg(feature = "memory-prof")]
pub mod memory_prof {
const PROF_DUMP: &[u8] = b"prof.dump\0";
// const OPT_PROF: &[u8] = b"opt.prof\0";
use anyhow::{bail, Context};
use nym_config::{must_get_home, DEFAULT_NYM_APIS_DIR, NYM_DIR};
use std::ffi::{c_char, CString};
use time::OffsetDateTime;
use tokio::fs::create_dir_all;
use tokio::io::AsyncReadExt;
pub async fn dump_profile() -> anyhow::Result<Vec<u8>> {
if !is_prof_enabled()? {
bail!("memory profiling is not enabled")
}
let now = OffsetDateTime::now_utc();
let dump_path = must_get_home()
.join(NYM_DIR)
.join(DEFAULT_NYM_APIS_DIR)
.join("memory_dumps")
.join(format!("{}", now.unix_timestamp()))
.join("nym-api.hprof");
let parent = dump_path.parent().unwrap();
create_dir_all(&parent).await?;
info!("using {} for the memory dump", dump_path.display());
let path = dump_path
.to_str()
.context("the temp dir contained invalid characters")?
.to_string();
let mut bytes = CString::new(path.as_str())
.context("could not construct a CString out of the path")?
.into_bytes_with_nul();
{
// #safety: we always expect a valid temp file path to write profiling data to.
let ptr = bytes.as_mut_ptr() as *mut c_char;
unsafe {
tikv_jemalloc_ctl::raw::write(PROF_DUMP, ptr).context(format!(
"failed to dump profiling data to {}",
dump_path.display()
))?
}
}
let mut f = tokio::fs::File::open(path.as_str())
.await
.context("failed to open the dump file")?;
let mut buf = vec![];
let _ = f
.read_to_end(&mut buf)
.await
.context("failed to read the dump file")?;
Ok(buf)
}
fn is_prof_enabled() -> anyhow::Result<bool> {
Ok(tikv_jemalloc_ctl::profiling::prof::read()?)
// Ok(unsafe {
// tikv_jemalloc_ctl::raw::read::<bool>(OPT_PROF)
// .context("failed to check the OPT_PROF")?
// })
}
}
pub struct BinaryResponse {
inner: Vec<u8>,
}
impl<'r, 'o: 'r> Responder<'r, 'o> for BinaryResponse {
fn respond_to(self, _req: &'r Request<'_>) -> response::Result<'o> {
let mut res = rocket::Response::new();
res.set_sized_body(self.inner.len(), std::io::Cursor::new(self.inner));
Ok(res)
}
}
impl OpenApiResponderInner for BinaryResponse {
fn responses(_gen: &mut OpenApiGenerator) -> rocket_okapi::Result<Responses> {
let mut responses = Responses::default();
ensure_status_code_exists(&mut responses, 200);
Ok(responses)
}
}
/// foomp
#[cfg(feature = "memory-prof")]
#[openapi(tag = "profiling")]
#[get("/mem")]
pub async fn mem_prof_handler() -> Result<BinaryResponse, RocketErrorResponse> {
let dump_data = memory_prof::dump_profile()
.await
.map_err(|err| RocketErrorResponse::new(err.to_string(), Status::InternalServerError))?;
Ok(BinaryResponse { inner: dump_data })
}
#[cfg(not(feature = "memory-prof"))]
#[openapi(tag = "profiling")]
#[get("/mem")]
pub async fn mem_prof_handler() -> RocketErrorResponse {
RocketErrorResponse::new("The 'mem-prof' feature is disabled", Status::NotImplemented)
}
pub(crate) fn api_status_routes(settings: &OpenApiSettings) -> (Vec<Route>, OpenApi) {
openapi_get_routes_spec![
settings:
mem_prof_handler
]
}
+3
View File
@@ -28,6 +28,8 @@ use rocket_okapi::swagger_ui::make_swagger_ui;
pub(crate) mod helpers;
pub(crate) mod openapi;
pub(crate) mod mem_prof;
pub(crate) async fn setup_rocket(
config: &Config,
network_details: NetworkDetails,
@@ -52,6 +54,7 @@ pub(crate) async fn setup_rocket(
"/api-status" => api_status_routes(&openapi_settings),
"/ecash" => ecash::routes_open_api(&openapi_settings, config.coconut_signer.enabled),
"" => nym_node_routes_deprecated(&openapi_settings),
"/prof" => mem_prof::api_status_routes(&openapi_settings),
// => when we move those routes, we'll need to add a redirection for backwards compatibility
"/unstable/nym-nodes" => nym_node_routes_next(&openapi_settings)
+40
View File
@@ -0,0 +1,40 @@
# Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
# SPDX-License-Identifier: Apache-2.0
[package]
name = "nym-data-observatory"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
rust-version.workspace = true
readme.workspace = true
[dependencies]
anyhow = { workspace = true }
axum = { workspace = true, features = ["tokio"] }
chrono = { workspace = true }
nym-bin-common = { path = "../common/bin-common" }
nym-network-defaults = { path = "../common/network-defaults" }
nym-task = { path = "../common/task" }
nym-node-requests = { path = "../nym-node/nym-node-requests", features = ["openapi"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres", "offline"] }
tokio = { workspace = true, features = ["process"] }
tokio-util = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
tower-http = { workspace = true, features = ["cors", "trace"] }
utoipa = { workspace = true, features = ["axum_extras", "time"] }
utoipa-swagger-ui = { workspace = true, features = ["axum"] }
utoipauto = { workspace = true }
[build-dependencies]
anyhow = { workspace = true }
tokio = { workspace = true, features = ["macros" ] }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] }
+93
View File
@@ -0,0 +1,93 @@
# Making `sqlx` work
Some of the errors encountered and possible solutions.
## `The cargo feature offline has to be enabled to use SQLX_OFFLINE`
Did you enable `offline` **cargo feature** of `sqlx` dependency in your
`Cargo.toml`?
Also, it may happen if you have a version mismatch beetween
- `sqlx` cargo dependency in `Cargo.toml`
- `sqlx-cli` installed by cargo
To ensure correct version, do
```
cargo uninstall sqlx-cli
cargo install --version <version> sqlx-cli
```
where `<version>` matches the version of sqlx in your `Cargo.toml`.
## `Error: failed to connect to database: password authentication failed`
If it's in-code, make sure you don't "double authenticate", i.e.
- if username and password are already specified in `DATABASE_URL`
- then, you don't have to use
```rust
ConnectOptions::from_str(&database_Url)
.username() // unnecessary
.password() // unnecessary
```
If it's outside of code (i.e. when running `cargo check`)
- make sure password doesn't have any special characters that could be
interpreted by the command line/shell weirdly, like `$#\` etc.
## Cannot generate `sqlx-data.json`
In order for `sqlx` to generate schema for "offline" work (without DB
connection), as of `v0.6.3` you first **need an active DB connection**.
So make sure
- DB is running
- `DATABASE_URL` is set correctly
- `SQLX_OFFLINE` isn't exported to true
Then run `cargo sqlx prepare`
After you have the file, you can ignore `DATABASE_URL` and terminate the DB
instance. This file represents the DB schema, so when your migrations change,
you'll need to re-generate it
Make sure to commit the file to VCS if you want to avoid re-doing this again on
each machine (e.g. other developers, CI).
## Generated `sqlx-data.json` looks like this
```json
{
"db": "PostgreSQL"
}
```
after running `cargo sqlx prepare`
### Similar to:
```
warning: no queries found; do you have the `offline` feature enabled
```
### Possible solutions
- does your `sqlx-cli` version match `sqlx` version from `Cargo.toml`?
- do you have `offline` cargo feature enabled?
- make sure to `cargo clean` after these updates
## Any many, many more
- `EOF while parsing a value at line`
- `failed to find data for query`
### Possible solutions
- Usually a DB connection issue
- Retry everything
- Throw in a `cargo clean -p <your package>` for good measure
+58
View File
@@ -0,0 +1,58 @@
use anyhow::Result;
use sqlx::{Connection, PgConnection};
use std::io::Write;
use std::{collections::HashMap, fs::File};
const POSTGRES_USER: &str = "nym";
const POSTGRES_PASSWORD: &str = "password123";
const POSTGRES_DB: &str = "data_obs_db";
/// if schema changes, rerun `cargo sqlx prepare` with a running DB
/// https://github.com/launchbadge/sqlx/blob/main/sqlx-cli/README.md#enable-building-in-offline-mode-with-query
#[tokio::main]
async fn main() -> Result<()> {
let db_url =
format!("postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@localhost:5432/{POSTGRES_DB}");
// if a live DB is reachable, use that
if PgConnection::connect(&db_url).await.is_ok() {
export_db_variables(&db_url)?;
println!("cargo::rustc-env=SQLX_OFFLINE=false");
run_migrations(&db_url).await?;
} else {
// by default, run in offline mode
println!("cargo::rustc-env=SQLX_OFFLINE=true");
}
rerun_if_changed();
Ok(())
}
fn export_db_variables(db_url: &str) -> Result<()> {
let mut map = HashMap::new();
map.insert("POSTGRES_USER", POSTGRES_USER);
map.insert("POSTGRES_PASSWORD", POSTGRES_PASSWORD);
map.insert("POSTGRES_DB", POSTGRES_DB);
map.insert("DATABASE_URL", db_url);
let mut file = File::create(".env")?;
for (var, value) in map.iter() {
println!("cargo::rustc-env={}={}", var, value);
writeln!(file, "{}={}", var, value).expect("Failed to write to dotenv file");
}
Ok(())
}
async fn run_migrations(db_url: &str) -> Result<()> {
let mut conn = PgConnection::connect(db_url).await?;
sqlx::migrate!("./migrations").run(&mut conn).await?;
Ok(())
}
fn rerun_if_changed() {
println!("cargo::rerun-if-changed=migrations");
println!("cargo::rerun-if-changed=src/db/queries");
}
+13
View File
@@ -0,0 +1,13 @@
services:
postgres:
image: postgres:13
container_name: nym-data-observatory-pg
env_file:
- .env
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
volumes:
pgdata:
@@ -0,0 +1,6 @@
CREATE TABLE responses (
id SERIAL PRIMARY KEY,
joke_id VARCHAR NOT NULL UNIQUE,
joke TEXT NOT NULL,
date_created INTEGER NOT NULL
);
+12
View File
@@ -0,0 +1,12 @@
#!/bin/bash
source .env
# Launching a container in such a way that it's destroyed after you detach from the terminal:
docker compose up
# docker exec -it nym-data-observatory-pg /bin/bash
# psql -U youruser -d yourdb
echo "Tearing down containers to have a clean slate"
docker compose down -v
+79
View File
@@ -0,0 +1,79 @@
{
"db": "PostgreSQL",
"249faa11b88b749f50342bb5c9cc41d20896db543eed74a6f320c041bcbb723d": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": [
"Varchar",
"Text",
"Int4"
]
}
},
"query": "INSERT INTO responses\n (joke_id, joke, date_created)\n VALUES\n ($1, $2, $3)\n ON CONFLICT(joke_id) DO UPDATE SET\n joke=excluded.joke,\n date_created=excluded.date_created;"
},
"aff7fbd06728004d2f2226d20c32f1482df00de2dc1d2b4debbb2e12553d997b": {
"describe": {
"columns": [
{
"name": "joke_id",
"ordinal": 0,
"type_info": "Varchar"
},
{
"name": "joke",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "date_created",
"ordinal": 2,
"type_info": "Int4"
}
],
"nullable": [
false,
false,
false
],
"parameters": {
"Left": [
"Text"
]
}
},
"query": "SELECT joke_id, joke, date_created FROM responses WHERE joke_id = $1"
},
"e53f479f8cead3dc8aa1875e5d450ad69686cf6a109e37d6c3f0623c3e9f91d0": {
"describe": {
"columns": [
{
"name": "joke_id",
"ordinal": 0,
"type_info": "Varchar"
},
{
"name": "joke",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "date_created",
"ordinal": 2,
"type_info": "Int4"
}
],
"nullable": [
false,
false,
false
],
"parameters": {
"Left": []
}
},
"query": "SELECT joke_id, joke, date_created FROM responses"
}
}
@@ -0,0 +1,61 @@
use core::str;
use serde::Deserialize;
use tokio::process::Command;
use tokio::task::JoinHandle;
use tokio::time::Duration;
use crate::db::{self, DbPool};
const REFRESH_DELAY: Duration = Duration::from_secs(15);
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60 * 2);
pub(crate) async fn spawn_in_background(db_pool: DbPool) -> JoinHandle<()> {
loop {
tracing::info!("Running in a loop 🏃");
if let Err(e) = some_network_action(&db_pool).await {
tracing::error!(
"❌ Run failed: {e}, retrying in {}s...",
FAILURE_RETRY_DELAY.as_secs()
);
tokio::time::sleep(FAILURE_RETRY_DELAY).await;
} else {
tracing::info!(
"✅ Run successful, sleeping for {}s...",
REFRESH_DELAY.as_secs()
);
tokio::time::sleep(REFRESH_DELAY).await;
}
}
}
#[derive(Deserialize, Debug)]
pub(crate) struct Response {
#[serde(rename(deserialize = "id"))]
pub(crate) joke_id: String,
pub(crate) joke: String,
#[serde(rename(deserialize = "status"))]
pub(crate) _status: u16,
}
async fn some_network_action(pool: &DbPool) -> anyhow::Result<()> {
// for demonstration purposes only. You should use reqwest if you need it
let output = Command::new("curl")
.arg("-H")
.arg("Accept: application/json")
.arg("https://icanhazdadjoke.com/")
.output()
.await?;
if !output.status.success() {
anyhow::bail!("Curl command failed with status: {}", output.status);
}
let response_str = str::from_utf8(&output.stdout)?;
let joke_response: Response = serde_json::from_str(response_str)?;
tracing::info!("{:?}", joke_response.joke);
db::queries::insert_joke(pool, joke_response.into()).await?;
Ok(())
}
+40
View File
@@ -0,0 +1,40 @@
use anyhow::{anyhow, Result};
use sqlx::{migrate::Migrator, postgres::PgConnectOptions, ConnectOptions, PgPool};
use std::str::FromStr;
pub(crate) mod models;
pub(crate) mod queries;
pub(crate) const DATABASE_URL_ENV_VAR: &str = "DATABASE_URL";
static MIGRATOR: Migrator = sqlx::migrate!("./migrations");
pub(crate) type DbPool = PgPool;
pub(crate) struct Storage {
pool: DbPool,
}
impl Storage {
pub async fn init() -> Result<Self> {
let connection_url = std::env::var(DATABASE_URL_ENV_VAR).map_err(anyhow::Error::from)?;
let connect_options = {
let mut connect_options = PgConnectOptions::from_str(&connection_url)?;
let connect_options = connect_options.disable_statement_logging();
(*connect_options).clone()
};
let pool = DbPool::connect_with(connect_options)
.await
.map_err(|err| anyhow!("Failed to connect to {}: {}", &connection_url, err))?;
MIGRATOR.run(&pool).await?;
Ok(Storage { pool })
}
/// Cloning pool is cheap, it's the same underlying set of connections
pub async fn pool_owned(&self) -> DbPool {
self.pool.clone()
}
}
+22
View File
@@ -0,0 +1,22 @@
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use crate::background_task::Response;
#[derive(Serialize, Deserialize, Debug, ToSchema)]
pub(crate) struct JokeDto {
pub(crate) joke_id: String,
pub(crate) joke: String,
pub(crate) date_created: i32,
}
impl From<Response> for JokeDto {
fn from(value: Response) -> Self {
Self {
joke_id: value.joke_id,
joke: value.joke,
// casting not smart, can implicitly panic, don't do this in prod
date_created: chrono::offset::Utc::now().timestamp() as i32,
}
}
}
@@ -0,0 +1,39 @@
use crate::db::{models::JokeDto, DbPool};
pub(crate) async fn insert_joke(pool: &DbPool, joke: JokeDto) -> anyhow::Result<()> {
let mut conn = pool.acquire().await?;
sqlx::query!(
"INSERT INTO responses
(joke_id, joke, date_created)
VALUES
($1, $2, $3)
ON CONFLICT(joke_id) DO UPDATE SET
joke=excluded.joke,
date_created=excluded.date_created;",
joke.joke_id,
joke.joke,
joke.date_created as i32,
)
.execute(&mut *conn)
.await?;
Ok(())
}
pub(crate) async fn select_joke_by_id(pool: &DbPool, joke_id: &str) -> anyhow::Result<JokeDto> {
sqlx::query_as!(
JokeDto,
"SELECT joke_id, joke, date_created FROM responses WHERE joke_id = $1",
joke_id
)
.fetch_one(pool)
.await
.map_err(anyhow::Error::from)
}
pub(crate) async fn select_all(pool: &DbPool) -> anyhow::Result<Vec<JokeDto>> {
sqlx::query_as!(JokeDto, "SELECT joke_id, joke, date_created FROM responses",)
.fetch_all(pool)
.await
.map_err(anyhow::Error::from)
}
@@ -0,0 +1,5 @@
// group queries in files by theme
mod joke;
// re-exporting allows us to access all queries via `queries::bla``
pub(crate) use joke::{insert_joke, select_all, select_joke_by_id};
@@ -0,0 +1,78 @@
use axum::{
extract::{Path, State},
Json, Router,
};
use serde::Deserialize;
use utoipa::IntoParams;
use crate::{
db::{
models::JokeDto,
queries::{self, select_joke_by_id},
},
http::{
error::{Error, HttpResult},
state::AppState,
},
};
pub(crate) fn routes() -> Router<AppState> {
Router::new()
.route("/", axum::routing::get(jokes))
.route("/:joke_id", axum::routing::get(joke_by_id))
.route("/fetch_another", axum::routing::get(fetch_another))
}
#[utoipa::path(
tag = "Dad Jokes",
get,
path = "/v1/jokes",
responses(
(status = 200, body = Vec<JokeDto>)
)
)]
async fn jokes(State(state): State<AppState>) -> HttpResult<Json<Vec<JokeDto>>> {
queries::select_all(state.db_pool())
.await
.map(Json::from)
.map_err(|_| Error::internal())
}
#[derive(Deserialize, IntoParams)]
#[into_params(parameter_in = Path)]
struct JokeIdParam {
joke_id: String,
}
#[utoipa::path(
tag = "Dad Jokes",
get,
params(
JokeIdParam
),
path = "/v1/jokes/{joke_id}",
responses(
(status = 200, body = JokeDto)
)
)]
async fn joke_by_id(
Path(JokeIdParam { joke_id }): Path<JokeIdParam>,
State(state): State<AppState>,
) -> HttpResult<Json<JokeDto>> {
select_joke_by_id(state.db_pool(), &joke_id)
.await
.map(Json::from)
.map_err(|_| Error::not_found(joke_id))
}
#[utoipa::path(
tag = "Dad Jokes",
get,
path = "/v1/jokes/fetch_another",
responses(
(status = 200, body = String)
)
)]
async fn fetch_another(State(_state): State<AppState>) -> HttpResult<Json<String>> {
Ok(Json(String::from("Done boss, check the DB")))
}
@@ -0,0 +1,21 @@
use axum::{extract::State, Json, Router};
use crate::http::{error::HttpResult, state::AppState};
pub(crate) fn routes() -> Router<AppState> {
Router::new().route("/", axum::routing::get(mixnodes))
}
#[utoipa::path(
tag = "Mixnodes",
get,
path = "/v1/mixnodes",
responses(
(status = 200, body = String)
)
)]
async fn mixnodes(State(_state): State<AppState>) -> HttpResult<Json<serde_json::Value>> {
Ok(Json(
serde_json::json!({"message": "😎 Nothing to see here, move along 😎"}),
))
}
+79
View File
@@ -0,0 +1,79 @@
use anyhow::anyhow;
use axum::{response::Redirect, Router};
use tokio::net::ToSocketAddrs;
use tower_http::{cors::CorsLayer, trace::TraceLayer};
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
use crate::http::{api_docs, server::HttpServer, state::AppState};
pub(crate) mod jokes;
pub(crate) mod mixnodes;
pub(crate) struct RouterBuilder {
unfinished_router: Router<AppState>,
}
impl RouterBuilder {
pub(crate) fn with_default_routes() -> Self {
let router = Router::new()
.merge(
SwaggerUi::new("/swagger")
.url("/api-docs/openapi.json", api_docs::ApiDoc::openapi()),
)
.route(
"/",
axum::routing::get(|| async { Redirect::permanent("/swagger") }),
)
.nest(
"/v1",
Router::new()
.nest("/jokes", jokes::routes())
.nest("/mixnodes", mixnodes::routes()),
);
Self {
unfinished_router: router,
}
}
pub(crate) fn with_state(self, state: AppState) -> RouterWithState {
RouterWithState {
router: self.finalize_routes().with_state(state),
}
}
fn finalize_routes(self) -> Router<AppState> {
// layers added later wrap earlier layers
self.unfinished_router
// CORS layer needs to wrap other API layers
.layer(setup_cors())
// logger should be outermost layer
.layer(TraceLayer::new_for_http())
}
}
pub(crate) struct RouterWithState {
router: Router,
}
impl RouterWithState {
pub(crate) async fn build_server<A: ToSocketAddrs>(
self,
bind_address: A,
) -> anyhow::Result<HttpServer> {
tokio::net::TcpListener::bind(bind_address)
.await
.map(|listener| HttpServer::new(self.router, listener))
.map_err(|err| anyhow!("Couldn't bind to address due to {}", err))
}
}
fn setup_cors() -> CorsLayer {
use axum::http::Method;
CorsLayer::new()
.allow_origin(tower_http::cors::Any)
.allow_methods([Method::POST, Method::GET, Method::PATCH, Method::OPTIONS])
.allow_headers(tower_http::cors::Any)
.allow_credentials(false)
}
@@ -0,0 +1,91 @@
use axum::Router;
use core::net::SocketAddr;
use tokio::{net::TcpListener, task::JoinHandle};
use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned};
use crate::{
db::DbPool,
http::{api::RouterBuilder, state::AppState},
};
/// Return handles that allow for graceful shutdown of server + awaiting its
/// background tokio task
pub(crate) async fn start_http_api(
db_pool: DbPool,
http_port: u16,
nym_http_cache_ttl: u64,
) -> anyhow::Result<ShutdownHandles> {
let router_builder = RouterBuilder::with_default_routes();
let state = AppState::new(db_pool, nym_http_cache_ttl);
let router = router_builder.with_state(state);
let bind_addr = format!("0.0.0.0:{}", http_port);
let server = router.build_server(bind_addr).await?;
Ok(start_server(server))
}
fn start_server(server: HttpServer) -> ShutdownHandles {
// one copy is stored to trigger a graceful shutdown later
let shutdown_button = CancellationToken::new();
// other copy is given to server to listen for a shutdown
let shutdown_receiver = shutdown_button.clone();
let shutdown_receiver = shutdown_receiver.cancelled_owned();
let server_handle = tokio::spawn(async move { server.run(shutdown_receiver).await });
ShutdownHandles {
server_handle,
shutdown_button,
}
}
pub(crate) struct ShutdownHandles {
server_handle: JoinHandle<std::io::Result<()>>,
shutdown_button: CancellationToken,
}
impl ShutdownHandles {
/// Send graceful shutdown signal to server and wait for server task to complete
pub(crate) async fn shutdown(self) -> anyhow::Result<()> {
self.shutdown_button.cancel();
match self.server_handle.await {
Ok(Ok(_)) => {
tracing::info!("HTTP server shut down without errors");
}
Ok(Err(err)) => {
tracing::error!("HTTP server terminated with: {err}");
anyhow::bail!(err)
}
Err(err) => {
tracing::error!("Server task panicked: {err}");
}
};
Ok(())
}
}
pub(crate) struct HttpServer {
router: Router,
listener: TcpListener,
}
impl HttpServer {
pub(crate) fn new(router: Router, listener: TcpListener) -> Self {
Self { router, listener }
}
pub(crate) async fn run(self, receiver: WaitForCancellationFutureOwned) -> std::io::Result<()> {
// into_make_service_with_connect_info allows us to see client ip address
axum::serve(
self.listener,
self.router
.into_make_service_with_connect_info::<SocketAddr>(),
)
.with_graceful_shutdown(receiver)
.await
}
}
+10
View File
@@ -0,0 +1,10 @@
use utoipa::OpenApi;
use utoipauto::utoipauto;
// manually import external structs which are behind feature flags because they
// can't be automatically discovered
// https://github.com/ProbablyClem/utoipauto/issues/13#issuecomment-1974911829
#[utoipauto(paths = "./nym-data-observatory/src")]
#[derive(OpenApi)]
#[openapi(info(title = "Nym API"), tags(), components(schemas()))]
pub(super) struct ApiDoc;
+23
View File
@@ -0,0 +1,23 @@
use crate::read_env_var;
#[derive(Debug)]
pub(crate) struct Config {
http_port: u16,
}
const HTTP_PORT_DEFAULT: u16 = 8000;
impl Config {
pub(crate) fn from_env() -> Self {
Self {
http_port: read_env_var("HTTP_PORT")
.unwrap_or(HTTP_PORT_DEFAULT.to_string())
.parse()
.unwrap_or(HTTP_PORT_DEFAULT),
}
}
pub(crate) fn http_port(&self) -> u16 {
self.http_port
}
}
+28
View File
@@ -0,0 +1,28 @@
pub(crate) type HttpResult<T> = Result<T, Error>;
pub(crate) struct Error {
message: String,
status: axum::http::StatusCode,
}
impl Error {
pub(crate) fn not_found(message: String) -> Self {
Self {
message,
status: axum::http::StatusCode::NOT_FOUND,
}
}
pub(crate) fn internal() -> Self {
Self {
message: String::from("Internal server error"),
status: axum::http::StatusCode::INTERNAL_SERVER_ERROR,
}
}
}
impl axum::response::IntoResponse for Error {
fn into_response(self) -> axum::response::Response {
(self.status, self.message).into_response()
}
}
+6
View File
@@ -0,0 +1,6 @@
pub(crate) mod api;
pub(crate) mod api_docs;
pub(crate) mod config;
pub(crate) mod error;
pub(crate) mod server;
pub(crate) mod state;
+91
View File
@@ -0,0 +1,91 @@
use axum::Router;
use core::net::SocketAddr;
use tokio::{net::TcpListener, task::JoinHandle};
use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned};
use crate::{
db::DbPool,
http::{api::RouterBuilder, state::AppState},
};
/// Return handles that allow for graceful shutdown of server + awaiting its
/// background tokio task
pub(crate) async fn start_http_api(
db_pool: DbPool,
http_port: u16,
) -> anyhow::Result<ShutdownHandles> {
let router_builder = RouterBuilder::with_default_routes();
let state = AppState::new(db_pool);
let router = router_builder.with_state(state);
let bind_addr = format!("0.0.0.0:{}", http_port);
let server = router.build_server(bind_addr).await?;
Ok(start_server(server))
}
fn start_server(server: HttpServer) -> ShutdownHandles {
// one copy is stored to trigger a graceful shutdown later
let shutdown_button = CancellationToken::new();
// other copy is given to server to listen for a shutdown
let shutdown_receiver = shutdown_button.clone();
let shutdown_receiver = shutdown_receiver.cancelled_owned();
let server_handle = tokio::spawn(async move { server.run(shutdown_receiver).await });
ShutdownHandles {
server_handle,
shutdown_button,
}
}
pub(crate) struct ShutdownHandles {
server_handle: JoinHandle<std::io::Result<()>>,
shutdown_button: CancellationToken,
}
impl ShutdownHandles {
/// Send graceful shutdown signal to server and wait for server task to complete
pub(crate) async fn shutdown(self) -> anyhow::Result<()> {
self.shutdown_button.cancel();
match self.server_handle.await {
Ok(Ok(_)) => {
tracing::info!("HTTP server shut down without errors");
}
Ok(Err(err)) => {
tracing::error!("HTTP server terminated with: {err}");
anyhow::bail!(err)
}
Err(err) => {
tracing::error!("Server task panicked: {err}");
}
};
Ok(())
}
}
pub(crate) struct HttpServer {
router: Router,
listener: TcpListener,
}
impl HttpServer {
pub(crate) fn new(router: Router, listener: TcpListener) -> Self {
Self { router, listener }
}
pub(crate) async fn run(self, receiver: WaitForCancellationFutureOwned) -> std::io::Result<()> {
// into_make_service_with_connect_info allows us to see client ip address
// in middleware, for logging, TLS, routing etc.
axum::serve(
self.listener,
self.router
.into_make_service_with_connect_info::<SocketAddr>(),
)
.with_graceful_shutdown(receiver)
.await
}
}
+16
View File
@@ -0,0 +1,16 @@
use crate::db::DbPool;
#[derive(Debug, Clone)]
pub(crate) struct AppState {
db_pool: DbPool,
}
impl AppState {
pub(crate) fn new(db_pool: DbPool) -> Self {
Self { db_pool }
}
pub(crate) fn db_pool(&self) -> &DbPool {
&self.db_pool
}
}
+43
View File
@@ -0,0 +1,43 @@
use tracing::level_filters::LevelFilter;
use tracing_subscriber::{filter::Directive, EnvFilter};
pub(crate) fn setup_tracing_logger() {
fn directive_checked(directive: String) -> Directive {
directive.parse().expect("Failed to parse log directive")
}
let log_builder = tracing_subscriber::fmt()
// Use a more compact, abbreviated log format
.compact()
// Display source code file paths
.with_file(true)
// Display source code line numbers
.with_line_number(true)
// Don't display the event's target (module path)
.with_target(false);
let mut filter = EnvFilter::builder()
// if RUST_LOG isn't set, set default level
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy();
// these crates are more granularly filtered
let filter_crates = [
"nym_bin_common",
"nym_explorer_client",
"nym_network_defaults",
"nym_validator_client",
"reqwest",
"rustls",
"hyper",
"sqlx",
"h2",
"tendermint_rpc",
"tower_http",
"axum",
];
for crate_name in filter_crates {
filter = filter.add_directive(directive_checked(format!("{}=warn", crate_name)));
}
log_builder.with_env_filter(filter).init();
}
+52
View File
@@ -0,0 +1,52 @@
use nym_network_defaults::setup_env;
use nym_task::signal::wait_for_signal;
use crate::http::config;
mod background_task;
mod db;
mod http;
mod logging;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
logging::setup_tracing_logger();
// if dotenv file is present, load its values
// otherwise, default to mainnet
setup_env(Some(".env"));
let conf = config::Config::from_env();
tracing::debug!("Using config:\n{:?}", conf);
let storage = db::Storage::init().await?;
let db_pool = storage.pool_owned().await;
tokio::spawn(async move {
background_task::spawn_in_background(db_pool).await;
tracing::info!("Started task");
});
let shutdown_handles =
http::server::start_http_api(storage.pool_owned().await, conf.http_port())
.await
.expect("Failed to start server");
tracing::info!("Started HTTP server on port {}", conf.http_port());
wait_for_signal().await;
if let Err(err) = shutdown_handles.shutdown().await {
tracing::error!("{err}");
};
Ok(())
}
// TODO dz move this to common
fn read_env_var(env_var: &str) -> anyhow::Result<String> {
std::env::var(env_var)
.map_err(|_| anyhow::anyhow!("You need to set {}", env_var))
.map(|value| {
tracing::trace!("{}={}", env_var, value);
value
})
}
+1 -1
View File
@@ -3,7 +3,7 @@
[package]
name = "nym-node"
version = "1.1.7"
version = "1.1.8"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
+3 -2
View File
@@ -30,12 +30,13 @@ fastrand = { workspace = true }
nym-crypto = { path = "../../common/crypto", features = ["asymmetric", "rand"] }
nym-http-api-common = { path = "../../common/http-api-common" }
nym-node-requests = { path = "../nym-node-requests", default-features = false, features = ["openapi"] }
nym-node-requests = { path = "../nym-node-requests", default-features = false, features = [
"openapi",
] }
nym-task = { path = "../../common/task" }
nym-metrics = { path = "../../common/nym-metrics" }
nym-wireguard = { path = "../../common/wireguard" }
nym-wireguard-types = { path = "../../common/wireguard-types", features = ["verify"] }
[dev-dependencies]
base64 = { workspace = true }
@@ -60,9 +60,6 @@ use utoipa_swagger_ui::SwaggerUi;
api_requests::v1::gateway::models::Wireguard,
api_requests::v1::gateway::models::ClientInterfaces,
api_requests::v1::gateway::models::WebSockets,
api_requests::v1::gateway::client_interfaces::wireguard::models::ClientMessage,
api_requests::v1::gateway::client_interfaces::wireguard::models::InitMessage,
api_requests::v1::gateway::client_interfaces::wireguard::models::GatewayClient,
api_requests::v1::mixnode::models::Mixnode,
api_requests::v1::network_requester::models::NetworkRequester,
api_requests::v1::network_requester::exit_policy::models::AddressPolicy,
+9 -4
View File
@@ -12,7 +12,7 @@ license.workspace = true
[dependencies]
base64 = { workspace = true }
celes = { workspace = true } # country codes
celes = { workspace = true } # country codes
humantime = { workspace = true }
humantime-serde = { workspace = true }
schemars = { workspace = true, features = ["preserve_order"] }
@@ -21,7 +21,10 @@ serde_json = { workspace = true }
time = { workspace = true, features = ["serde", "formatting", "parsing"] }
thiserror = { workspace = true }
nym-crypto = { path = "../../common/crypto", features = ["asymmetric", "serde"] }
nym-crypto = { path = "../../common/crypto", features = [
"asymmetric",
"serde",
] }
nym-exit-policy = { path = "../../common/exit-policy" }
nym-wireguard-types = { path = "../../common/wireguard-types", default-features = false }
@@ -33,7 +36,9 @@ nym-http-api-client = { path = "../../common/http-api-client", optional = true }
## openapi:
utoipa = { workspace = true, optional = true }
nym-bin-common = { path = "../../common/bin-common", features = ["bin_info_schema"] }
nym-bin-common = { path = "../../common/bin-common", features = [
"bin_info_schema",
] }
[dev-dependencies]
tokio = { workspace = true, features = ["full"] }
@@ -44,4 +49,4 @@ nym-crypto = { path = "../../common/crypto", features = ["rand"] }
[features]
default = ["client"]
client = ["nym-http-api-client", "async-trait"]
openapi = ["utoipa", "nym-bin-common/openapi", "nym-wireguard-types/openapi", "nym-exit-policy/openapi"]
openapi = ["utoipa", "nym-bin-common/openapi", "nym-exit-policy/openapi"]
@@ -1,6 +1,4 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub use nym_wireguard_types::{
ClientMac, ClientMessage, GatewayClient, InitMessage, Nonce, PeerPublicKey,
};
pub use nym_wireguard_types::PeerPublicKey;
+2 -2
View File
@@ -37,7 +37,7 @@ use rand::rngs::OsRng;
use rand::{CryptoRng, RngCore};
use std::path::Path;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc;
use tracing::{debug, error, info, trace};
use zeroize::Zeroizing;
@@ -274,7 +274,7 @@ impl ExitGatewayData {
pub struct WireguardData {
inner: WireguardGatewayData,
peer_rx: UnboundedReceiver<PeerControlRequest>,
peer_rx: mpsc::Receiver<PeerControlRequest>,
}
impl WireguardData {
+43 -55
View File
@@ -538,7 +538,7 @@ checksum = "2d886547e41f740c616ae73108f6eb70afe6d940c7bc697cb30f13daec073037"
dependencies = [
"camino",
"cargo-platform",
"semver 1.0.22",
"semver",
"serde",
"serde_json",
"thiserror",
@@ -1141,7 +1141,7 @@ dependencies = [
"cosmwasm-std",
"cw2",
"schemars",
"semver 1.0.22",
"semver",
"serde",
"thiserror",
]
@@ -1156,7 +1156,7 @@ dependencies = [
"cosmwasm-std",
"cw-storage-plus",
"schemars",
"semver 1.0.22",
"semver",
"serde",
"thiserror",
]
@@ -2158,7 +2158,7 @@ dependencies = [
"futures-core",
"futures-sink",
"http 1.1.0",
"indexmap 2.0.0",
"indexmap 2.5.0",
"slab",
"tokio",
"tokio-util",
@@ -2190,9 +2190,9 @@ dependencies = [
[[package]]
name = "hashbrown"
version = "0.14.0"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
[[package]]
name = "heck"
@@ -2559,12 +2559,12 @@ dependencies = [
[[package]]
name = "indexmap"
version = "2.0.0"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d"
checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5"
dependencies = [
"equivalent",
"hashbrown 0.14.0",
"hashbrown 0.14.5",
"serde",
]
@@ -3115,7 +3115,7 @@ dependencies = [
"log",
"pretty_env_logger",
"schemars",
"semver 0.11.0",
"semver",
"serde",
"utoipa",
"vergen",
@@ -3173,7 +3173,7 @@ dependencies = [
"log",
"nym-network-defaults",
"serde",
"toml 0.7.6",
"toml 0.8.19",
"url",
]
@@ -3517,12 +3517,9 @@ dependencies = [
"base64 0.22.1",
"log",
"nym-config",
"nym-crypto",
"nym-network-defaults",
"serde",
"serde_json",
"thiserror",
"utoipa",
"x25519-dalek",
]
@@ -4555,7 +4552,7 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
dependencies = [
"semver 1.0.22",
"semver",
]
[[package]]
@@ -4798,31 +4795,13 @@ dependencies = [
[[package]]
name = "semver"
version = "0.11.0"
version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f301af10236f6df4160f7c3f04eec6dbc70ace82d23326abad5edee88801c6b6"
dependencies = [
"semver-parser",
]
[[package]]
name = "semver"
version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92d43fe69e652f3df9bdc2b85b2854a0825b86e4fb76bc44d945137d053639ca"
checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b"
dependencies = [
"serde",
]
[[package]]
name = "semver-parser"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0bef5b7f9e0df16536d3961cfb6e84331c065b4066afb39768d0e319411f7"
dependencies = [
"pest",
]
[[package]]
name = "serde"
version = "1.0.210"
@@ -4897,9 +4876,9 @@ dependencies = [
[[package]]
name = "serde_spanned"
version = "0.6.3"
version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96426c9936fd7a0124915f9185ea1d20aa9445cc9821142f0a73bc9207a2e186"
checksum = "87607cb1398ed59d48732e575a4c28a7a8ebf2454b964fe3f224f2afc07909e1"
dependencies = [
"serde",
]
@@ -5450,7 +5429,7 @@ dependencies = [
"raw-window-handle",
"regex",
"rfd",
"semver 1.0.22",
"semver",
"serde",
"serde_json",
"serde_repr",
@@ -5483,7 +5462,7 @@ dependencies = [
"cargo_toml",
"heck 0.4.1",
"json-patch",
"semver 1.0.22",
"semver",
"serde_json",
"tauri-utils",
"winres",
@@ -5504,7 +5483,7 @@ dependencies = [
"proc-macro2",
"quote",
"regex",
"semver 1.0.22",
"semver",
"serde",
"serde_json",
"sha2 0.10.8",
@@ -5587,7 +5566,7 @@ dependencies = [
"phf 0.10.1",
"proc-macro2",
"quote",
"semver 1.0.22",
"semver",
"serde",
"serde_json",
"serde_with",
@@ -5682,7 +5661,7 @@ dependencies = [
"serde",
"serde_json",
"tendermint 0.37.0",
"toml 0.8.2",
"toml 0.8.19",
"url",
]
@@ -5735,7 +5714,7 @@ dependencies = [
"pin-project",
"rand 0.8.5",
"reqwest 0.11.22",
"semver 1.0.22",
"semver",
"serde",
"serde_bytes",
"serde_json",
@@ -5954,21 +5933,21 @@ dependencies = [
[[package]]
name = "toml"
version = "0.8.2"
version = "0.8.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "185d8ab0dfbb35cf1399a6344d8484209c088f75f8f68230da55d48d95d43e3d"
checksum = "a1ed1f98e3fdc28d6d910e6737ae6ab1a93bf1985935a1193e68f93eeb68d24e"
dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"toml_edit 0.20.2",
"toml_edit 0.22.22",
]
[[package]]
name = "toml_datetime"
version = "0.6.3"
version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b"
checksum = "0dd7358ecb8fc2f8d014bf86f6f638ce72ba252a2c3a2572f2a795f1d23efb41"
dependencies = [
"serde",
]
@@ -5979,24 +5958,24 @@ version = "0.19.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8123f27e969974a3dfba720fdb560be359f57b44302d280ba72e76a74480e8a"
dependencies = [
"indexmap 2.0.0",
"indexmap 2.5.0",
"serde",
"serde_spanned",
"toml_datetime",
"winnow",
"winnow 0.5.10",
]
[[package]]
name = "toml_edit"
version = "0.20.2"
version = "0.22.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "396e4d48bbb2b7554c944bde63101b5ae446cff6ec4a24227428f15eb72ef338"
checksum = "4ae48d6208a266e853d946088ed816055e556cc6028c5e8e2b84d9fa5dd7c7f5"
dependencies = [
"indexmap 2.0.0",
"indexmap 2.5.0",
"serde",
"serde_spanned",
"toml_datetime",
"winnow",
"winnow 0.6.20",
]
[[package]]
@@ -6213,7 +6192,7 @@ version = "4.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5afb1a60e207dca502682537fefcfd9921e71d0b83e9576060f09abc6efab23"
dependencies = [
"indexmap 2.0.0",
"indexmap 2.5.0",
"serde",
"serde_json",
"utoipa-gen",
@@ -6870,6 +6849,15 @@ dependencies = [
"memchr",
]
[[package]]
name = "winnow"
version = "0.6.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "36c1fec1a2bb5866f07c25f68c26e565c4c200aebb96d7e55710c19d3e8ac49b"
dependencies = [
"memchr",
]
[[package]]
name = "winreg"
version = "0.50.0"
+5 -5
View File
@@ -1,7 +1,8 @@
[package]
name = "nym-cpp-ffi"
version = "0.1.1"
version = "0.1.2"
edition = "2021"
license.workspace = true
[lib]
name = "nym_cpp_ffi"
@@ -11,13 +12,12 @@ crate-type = ["cdylib"]
# Async runtime
tokio = { version = "1", features = ["full"] }
# Nym clients, addressing, packet format, common tools (logging), ffi shared
nym-sdk = { git = "https://github.com/nymtech/nym", branch = "master" }
nym-bin-common = { git = "https://github.com/nymtech/nym", branch = "master" }
nym-sphinx-anonymous-replies = { git = "https://github.com/nymtech/nym", branch = "master" }
nym-sdk = { path = "../../rust/nym-sdk/" }
nym-bin-common = { path = "../../../common/bin-common" }
nym-sphinx-anonymous-replies = { path = "../../../common/nymsphinx/anonymous-replies" }
nym-ffi-shared = { path = "../shared" }
lazy_static = "1.4.0"
# error handling
anyhow = "1.0.75"
# base58 en/decoding
bs58 = "0.5.0"
+23 -23
View File
@@ -1,46 +1,46 @@
# C++ FFI
> ⚠️ This is an initial version of this library in order to give developers something to experiment with. If you use this code to begin testing out Mixnet integration and run into issues, errors, or have feedback, please feel free to open an issue; feedback from developers trying to use it will help us improve it. If you have questions feel free to reach out via our [Matrix channel](https://matrix.to/#/#dev:nymtech.chat).
# C++ FFI
> ⚠️ This is an initial version of this library in order to give developers something to experiment with. If you use this code to begin testing out Mixnet integration and run into issues, errors, or have feedback, please feel free to open an issue; feedback from developers trying to use it will help us improve it. If you have questions feel free to reach out via our [Matrix channel](https://matrix.to/#/#dev:nymtech.chat).
This repo contains:
* `lib.rs`: an initial version of bindings for interacting with the Mixnet via the Rust SDK from C++. These are essentially match statements wrapping imported functions from the `nym-ffi-shared` lib allowing for nicer [error handling](#error-handling-).
* `main.cpp`: an example of using this library, relying on `Boost` for threads.
* `lib.rs`: an initial version of bindings for interacting with the Mixnet via the Rust SDK from C++. These are essentially match statements wrapping imported functions from the `nym-ffi-shared` lib allowing for nicer [error handling](#error-handling-).
* `main.cpp`: an example of using this library, relying on `Boost` for threads.
The example `.cpp` file is a simple example flow of:
* setting up Nym client logging
The example `.cpp` file is a simple example flow of:
* setting up Nym client logging
* creating an ephemeral Nym client (no key storage / persistent address - this will come in a future iteration)
* getting its [Nym address](https://nymtech.net/docs/clients/addressing-system.html)
* using that address to send a message to yourself via the Mixnet
* using that address to send a message to yourself via the Mixnet
* listen for and parse the incoming message for the `sender_tag` used for [anonymous replies with SURBs](https://nymtech.net/docs/architecture/traffic-flow.html#private-replies-using-surbs)
* send a reply to yourself using SURBs
## Installation
Prerequisites:
> Unlike the Go FFI code, this code does not yet have bindings for the TcpProxyClient/Server. This will happen in the future.
## Installation
Prerequisites:
* Rust
* C++
* C++
* [Boost](https://www.boost.org/) which can be installed with:
```
# Arch / Manjaro
yay -S boost boost-libs
# Arch / Manjaro
yay -S boost boost-libs
# Debian / Ubuntu
# Debian / Ubuntu
sudo apt install libboost-all-dev
```
## Usage
The `build.sh` script in the root of the repository speeds up the task of building and linking the Rust and C++ code.
* if want to quickly recompile your code run it as-is with `./build.sh`
* if you want to clean build both the Rust and C++ code after removing existing compiled binaries run it with the optional `clean` argument: `./build.sh clean`.
> Make sure to run the script from the root of the project directory.
The `build.sh` script in the root of the repository speeds up the task of building and linking the Rust and C++ code.
* if want to quickly recompile your code run it as-is with `./build.sh`
* if you want to clean build both the Rust and C++ code after removing existing compiled binaries run it with the optional `clean` argument: `./build.sh clean`.
This script will:
> Make sure to run the script from the root of the project directory.
This script will:
* (optionally if called with `clean` argument) remove existing Rust and C++ artifacts
* build `lib.rs` with the `--release` flag
* compile `main.cpp`, linking `lib.rs`
* compile `main.cpp`, linking `lib.rs`
* set value of `LD_LIBRARY_PATH` to the Rust code in `target/release/`
* run the compiled `main`
## Error Handling
## Error Handling
When calling a function across the FFI boundary (e.g.) `reply`, the Rust code is matching the output of an `_internal` function - `Res` or `Err` - to a member of the `StatusCode` enum. This allows for both Rust-style error handling and the ease of returning a `c_int` across the FFI boundary, which can be used by C++ for its own error handling / conditional logic.
+2 -3
View File
@@ -20,8 +20,8 @@ build_artifacts_and_link() {
cargo build --release &&
cd src/ &&
printf "compiling cpp \n"
g++ -std=c++11 -o main main.cpp -ldl -lpthread -L../target/release -lnym_cpp_ffi -lboost_thread &&
export LD_LIBRARY_PATH=../target/release:$LD_LIBRARY_PATH
g++ -std=c++11 -o main main.cpp -ldl -lpthread -L../../../../target/release -lnym_cpp_ffi -lboost_thread &&
export LD_LIBRARY_PATH=../../../../target/release:$LD_LIBRARY_PATH
# check output for name of rust lib - can be helpful if you've changed e.g. the name of a file and the compilation is failing
# printf "ldd main: \n"
# ldd main
@@ -48,4 +48,3 @@ else
exit 1
fi
fi
+3 -2
View File
@@ -1,7 +1,8 @@
// Copyright 2023-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_ffi_shared;
// TODO REMOVE when you're working on new CPP branch
#![allow(clippy::all)]
// use nym_ffi_shared;
use std::ffi::{c_char, c_int, CStr, CString};
use nym_sphinx_anonymous_replies::requests::AnonymousSenderTag;
+6 -1
View File
@@ -1,12 +1,17 @@
// Copyright 2023-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
// TODO REMOVE when you're working on new CPP branch
#![allow(clippy::all)]
pub mod types {
use std::ffi::c_char;
// TODO change all the numbers / replace -2 with prxy?
#[derive(Debug)]
pub enum StatusCode {
NoError = 0,
ClientInitError = -1,
ClientUninitialisedError = -2,
// ClientUninitialisedError = -2,
SelfAddrError = -3,
SendMsgError = -4,
ReplyError = -5,
+9 -8
View File
@@ -1,19 +1,20 @@
[package]
name = "nym-go-ffi" #"goffitest"
version = "0.1.0"
name = "nym-go-ffi" #"goffitest"
version = "0.2.0"
edition = "2021"
license.workspace = true
[lib]
crate-type = ["cdylib"]
name = "nym_go_ffi" #"go_ffi"
name = "nym_go_ffi" #"go_ffi"
[dependencies]
# Bindgen
uniffi = { version = "0.25.2", features = ["cli"] }
# Nym clients, addressing, packet format, common tools (logging), ffi shared
nym-bin-common = { git = "https://github.com/nymtech/nym", branch = "master" }
nym-sdk = { git = "https://github.com/nymtech/nym", branch = "master" }
nym-sphinx-anonymous-replies = { git = "https://github.com/nymtech/nym", branch = "master" }
nym-sdk = { path = "../../rust/nym-sdk/" }
nym-bin-common = { path = "../../../common/bin-common" }
nym-sphinx-anonymous-replies = { path = "../../../common/nymsphinx/anonymous-replies" }
nym-ffi-shared = { path = "../shared" }
# Async runtime
tokio = { version = "1", features = ["full"] }
@@ -23,8 +24,8 @@ anyhow = "1.0.79"
thiserror = "1.0.56"
[build-dependencies]
uniffi = { version = "0.25.2", features = ["build" ] }
uniffi_build = { version = "0.25.2", features=["builtin-bindgen"] }
uniffi = { version = "0.25.2", features = ["build"] }
uniffi_build = { version = "0.25.2", features = ["builtin-bindgen"] }
[[bin]]
name = "uniffi-bindgen"
+16 -24
View File
@@ -1,27 +1,20 @@
# Go FFI
> ⚠️ This is an initial version of this library in order to give developers something to experiment with. If you use this code to begin testing out Mixnet integration and run into issues, errors, or have feedback, please feel free to open an issue; feedback from developers trying to use it will help us improve it. If you have questions feel free to reach out via our [Matrix channel](https://matrix.to/#/#dev:nymtech.chat).
This repo contains:
* `lib.rs`: an initial version of bindings for interacting with the Mixnet via the Rust SDK from Go. These are essentially match statemtns wrapping imported functions from the `nym-ffi-shared` lib.
* `ffi/`: a directory containing:
This repo contains:
* `lib.rs`: an initial version of bindings for interacting with the Mixnet via the Rust SDK from Go. These are essentially match statemtns wrapping imported functions from the `nym-ffi-shared` lib.
* `ffi/`: a directory containing:
* the `bindings/` files generated using [`uniffi-bindgen-go`](https://github.com/NordSecurity/uniffi-bindgen-go)
* [`example.go`](./example.go): an example of using this library.
* [`example.go`](./example.go): an example of using the mixnet client functionality.
* [`proxy_example.go`](./proxy_example.go): an example of using the TcpProxy functionality.
The `example.go` file is an example flow of:
* setting up Nym client logging
* creating an ephemeral Nym client (no key storage / persistent address - this will come in a future iteration)
* getting its [Nym address](https://nymtech.net/docs/clients/addressing-system.html)
* using that address to send a message to yourself via the Mixnet
* listen for and parse the incoming message for the `sender_tag` used for [anonymous replies with SURBs](https://nymtech.net/docs/architecture/traffic-flow.html#private-replies-using-surbs)
* send a reply to yourself using SURBs
## Useage - Consuming the Library
You can import the bindings as normal and interact with them as shown in the example files. These files import the bindings from this repository (hence the `go.mod` and `go.sum` in the crate root) but you can import them remotely as usual.
## Useage - Consuming the Library
You can import the bindings as normal and interact with them as shown in the [example file](./example.go). This example imports the bindings from the this repository (hence the `go.mod` and `go.sum` in the crate root) but you can import them remotely as usual.
## Useage - Developing on the Library
If you want to fork and add new features/functions to this library use the following instructions to rebuild the Go bindings.
## Useage - Developing on the Library
If you want to fork and add new features/functions to this library use the following instructions to rebuild the Go bindings.
Rust functions exposed to the Go binding library are in `./src/lib.rs`.
Rust functions exposed to the Go binding library are in `./src/lib.rs`.
The `build.sh` script in the root of the repository speeds up the task of building and linking the Rust and Go code.
* if want to quickly recompile your code run it as-is with `./build.sh`
@@ -29,19 +22,18 @@ The `build.sh` script in the root of the repository speeds up the task of buildi
> Make sure to run the script from the root of the project directory, and that your LD PATH is set first!
> ```
> RUST_BINARIES=target/release
> echo 'export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:'${RUST_BINARIES} >> ~/.zshrc
> source ~/.zshrc
> RUST_BINARIES=../../../target/release
> echo 'export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:'${RUST_BINARIES} >> ~/.zshrc
> source ~/.zshrc
> ```
This script will:
* (optionally if called with `clean` argument) remove existing Rust and Go artifacts
* build `lib.rs` with the `--release` flag
* compile the Go bindings
* compile the Go bindings
**WIP** you need to manually add the following `cgo` flags to the generated bindings immediately underneath LN3 (`// #include <bindings.h`). In the future this will be automated in `build.sh`:
**WIP** you need to manually add the following `cgo` flags to the generated bindings immediately underneath LN3 (`// #include <bindings.h`). In the future this will be automated in `build.sh`:
```
// #cgo LDFLAGS: -L../../target/release -lnym_go_ffi
// #cgo LDFLAGS: -L../../../../../target/release -lnym_go_ffi
```
-1
View File
@@ -1,4 +1,3 @@
fn main() {
uniffi::generate_scaffolding("src/bindings.udl").unwrap();
}
+1 -8
View File
@@ -15,14 +15,7 @@ build_artifacts() {
printf "building go bindings \n"
uniffi-bindgen-go $UDL_PATH --out-dir $GO_DIR
printf "bindings built \n\n"
# something not right with these - having to add it manually to bindings.go for the moment
# pushd $GO_DIR/bindings
# echo $(pwd)
# LD_LIBRARY_PATH="${LD_LIBRARY_PATH:-}:../../target/release" \
# CGO_LDFLAGS="-L../target/release -lnym_go_ffi -lm -ldl" \
# CGO_ENABLED=1 \
# go run ../main.go
# TODO pull in auto binding from https://github.com/NordSecurity/uniffi-bindgen-go/blob/main/test_bindings.sh (removes need for manual addition of cgo flags)
}
clean_artifacts() {
+9
View File
@@ -6,6 +6,15 @@ import (
"time"
)
/*
Flow showing:
- setting up Nym client logging
- creating an ephemeral Nym client (no key storage / persistent address - this will come in a future iteration)
- getting its [Nym address](https://nymtech.net/docs/clients/addressing-system.html)
- using that address to send a message to yourself via the Mixnet
- listen for and parse the incoming message for the `sender_tag` used for [anonymous replies with SURBs] (https://nymtech.net/docs/architecture/traffic-flow.html#private-replies-using-surbs)
- send a reply to yourself using SURBs
*/
func main() {
// initialise Nym client logging - this is quite verbose but very informative
+289 -30
View File
@@ -1,7 +1,7 @@
package bindings
// #include <bindings.h>
// #cgo LDFLAGS: -L../../target/release -lnym_go_ffi
// #cgo LDFLAGS: -L../../../../../target/release -lnym_go_ffi
import "C"
import (
@@ -379,6 +379,42 @@ func uniffiCheckChecksums() {
panic("bindings: uniffi_nym_go_ffi_checksum_func_listen_for_incoming: UniFFI API checksum mismatch")
}
}
{
checksum := rustCall(func(uniffiStatus *C.RustCallStatus) C.uint16_t {
return C.uniffi_nym_go_ffi_checksum_func_new_proxy_client(uniffiStatus)
})
if checksum != 14386 {
// If this happens try cleaning and rebuilding your project
panic("bindings: uniffi_nym_go_ffi_checksum_func_new_proxy_client: UniFFI API checksum mismatch")
}
}
{
checksum := rustCall(func(uniffiStatus *C.RustCallStatus) C.uint16_t {
return C.uniffi_nym_go_ffi_checksum_func_new_proxy_client_default(uniffiStatus)
})
if checksum != 23215 {
// If this happens try cleaning and rebuilding your project
panic("bindings: uniffi_nym_go_ffi_checksum_func_new_proxy_client_default: UniFFI API checksum mismatch")
}
}
{
checksum := rustCall(func(uniffiStatus *C.RustCallStatus) C.uint16_t {
return C.uniffi_nym_go_ffi_checksum_func_new_proxy_server(uniffiStatus)
})
if checksum != 40789 {
// If this happens try cleaning and rebuilding your project
panic("bindings: uniffi_nym_go_ffi_checksum_func_new_proxy_server: UniFFI API checksum mismatch")
}
}
{
checksum := rustCall(func(uniffiStatus *C.RustCallStatus) C.uint16_t {
return C.uniffi_nym_go_ffi_checksum_func_proxy_server_address(uniffiStatus)
})
if checksum != 1079 {
// If this happens try cleaning and rebuilding your project
panic("bindings: uniffi_nym_go_ffi_checksum_func_proxy_server_address: UniFFI API checksum mismatch")
}
}
{
checksum := rustCall(func(uniffiStatus *C.RustCallStatus) C.uint16_t {
return C.uniffi_nym_go_ffi_checksum_func_reply(uniffiStatus)
@@ -388,6 +424,24 @@ func uniffiCheckChecksums() {
panic("bindings: uniffi_nym_go_ffi_checksum_func_reply: UniFFI API checksum mismatch")
}
}
{
checksum := rustCall(func(uniffiStatus *C.RustCallStatus) C.uint16_t {
return C.uniffi_nym_go_ffi_checksum_func_run_proxy_client(uniffiStatus)
})
if checksum != 45441 {
// If this happens try cleaning and rebuilding your project
panic("bindings: uniffi_nym_go_ffi_checksum_func_run_proxy_client: UniFFI API checksum mismatch")
}
}
{
checksum := rustCall(func(uniffiStatus *C.RustCallStatus) C.uint16_t {
return C.uniffi_nym_go_ffi_checksum_func_run_proxy_server(uniffiStatus)
})
if checksum != 57536 {
// If this happens try cleaning and rebuilding your project
panic("bindings: uniffi_nym_go_ffi_checksum_func_run_proxy_server: UniFFI API checksum mismatch")
}
}
{
checksum := rustCall(func(uniffiStatus *C.RustCallStatus) C.uint16_t {
return C.uniffi_nym_go_ffi_checksum_func_send_message(uniffiStatus)
@@ -399,6 +453,30 @@ func uniffiCheckChecksums() {
}
}
type FfiConverterUint64 struct{}
var FfiConverterUint64INSTANCE = FfiConverterUint64{}
func (FfiConverterUint64) Lower(value uint64) C.uint64_t {
return C.uint64_t(value)
}
func (FfiConverterUint64) Write(writer io.Writer, value uint64) {
writeUint64(writer, value)
}
func (FfiConverterUint64) Lift(value C.uint64_t) uint64 {
return uint64(value)
}
func (FfiConverterUint64) Read(reader io.Reader) uint64 {
return readUint64(reader)
}
type FfiDestroyerUint64 struct{}
func (FfiDestroyerUint64) Destroy(_ uint64) {}
type FfiConverterString struct{}
var FfiConverterStringINSTANCE = FfiConverterString{}
@@ -547,11 +625,15 @@ func (err GoWrapError) Unwrap() error {
// Err* are used for checking error type with `errors.Is`
var ErrGoWrapErrorClientInitError = fmt.Errorf("GoWrapErrorClientInitError")
var ErrGoWrapErrorClientUninitialisedError = fmt.Errorf("GoWrapErrorClientUninitialisedError")
var ErrGoWrapErrorSelfAddrError = fmt.Errorf("GoWrapErrorSelfAddrError")
var ErrGoWrapErrorSendMsgError = fmt.Errorf("GoWrapErrorSendMsgError")
var ErrGoWrapErrorReplyError = fmt.Errorf("GoWrapErrorReplyError")
var ErrGoWrapErrorListenError = fmt.Errorf("GoWrapErrorListenError")
var ErrGoWrapErrorProxyInitError = fmt.Errorf("GoWrapErrorProxyInitError")
var ErrGoWrapErrorProxyRunError = fmt.Errorf("GoWrapErrorProxyRunError")
var ErrGoWrapErrorServerInitError = fmt.Errorf("GoWrapErrorServerInitError")
var ErrGoWrapErrorAddressGetterError = fmt.Errorf("GoWrapErrorAddressGetterError")
var ErrGoWrapErrorServerRunError = fmt.Errorf("GoWrapErrorServerRunError")
// Variant structs
type GoWrapErrorClientInitError struct {
@@ -572,24 +654,6 @@ func (self GoWrapErrorClientInitError) Is(target error) bool {
return target == ErrGoWrapErrorClientInitError
}
type GoWrapErrorClientUninitialisedError struct {
message string
}
func NewGoWrapErrorClientUninitialisedError() *GoWrapError {
return &GoWrapError{
err: &GoWrapErrorClientUninitialisedError{},
}
}
func (err GoWrapErrorClientUninitialisedError) Error() string {
return fmt.Sprintf("ClientUninitialisedError: %s", err.message)
}
func (self GoWrapErrorClientUninitialisedError) Is(target error) bool {
return target == ErrGoWrapErrorClientUninitialisedError
}
type GoWrapErrorSelfAddrError struct {
message string
}
@@ -662,6 +726,96 @@ func (self GoWrapErrorListenError) Is(target error) bool {
return target == ErrGoWrapErrorListenError
}
type GoWrapErrorProxyInitError struct {
message string
}
func NewGoWrapErrorProxyInitError() *GoWrapError {
return &GoWrapError{
err: &GoWrapErrorProxyInitError{},
}
}
func (err GoWrapErrorProxyInitError) Error() string {
return fmt.Sprintf("ProxyInitError: %s", err.message)
}
func (self GoWrapErrorProxyInitError) Is(target error) bool {
return target == ErrGoWrapErrorProxyInitError
}
type GoWrapErrorProxyRunError struct {
message string
}
func NewGoWrapErrorProxyRunError() *GoWrapError {
return &GoWrapError{
err: &GoWrapErrorProxyRunError{},
}
}
func (err GoWrapErrorProxyRunError) Error() string {
return fmt.Sprintf("ProxyRunError: %s", err.message)
}
func (self GoWrapErrorProxyRunError) Is(target error) bool {
return target == ErrGoWrapErrorProxyRunError
}
type GoWrapErrorServerInitError struct {
message string
}
func NewGoWrapErrorServerInitError() *GoWrapError {
return &GoWrapError{
err: &GoWrapErrorServerInitError{},
}
}
func (err GoWrapErrorServerInitError) Error() string {
return fmt.Sprintf("ServerInitError: %s", err.message)
}
func (self GoWrapErrorServerInitError) Is(target error) bool {
return target == ErrGoWrapErrorServerInitError
}
type GoWrapErrorAddressGetterError struct {
message string
}
func NewGoWrapErrorAddressGetterError() *GoWrapError {
return &GoWrapError{
err: &GoWrapErrorAddressGetterError{},
}
}
func (err GoWrapErrorAddressGetterError) Error() string {
return fmt.Sprintf("AddressGetterError: %s", err.message)
}
func (self GoWrapErrorAddressGetterError) Is(target error) bool {
return target == ErrGoWrapErrorAddressGetterError
}
type GoWrapErrorServerRunError struct {
message string
}
func NewGoWrapErrorServerRunError() *GoWrapError {
return &GoWrapError{
err: &GoWrapErrorServerRunError{},
}
}
func (err GoWrapErrorServerRunError) Error() string {
return fmt.Sprintf("ServerRunError: %s", err.message)
}
func (self GoWrapErrorServerRunError) Is(target error) bool {
return target == ErrGoWrapErrorServerRunError
}
type FfiConverterTypeGoWrapError struct{}
var FfiConverterTypeGoWrapErrorINSTANCE = FfiConverterTypeGoWrapError{}
@@ -682,15 +836,23 @@ func (c FfiConverterTypeGoWrapError) Read(reader io.Reader) error {
case 1:
return &GoWrapError{&GoWrapErrorClientInitError{message}}
case 2:
return &GoWrapError{&GoWrapErrorClientUninitialisedError{message}}
case 3:
return &GoWrapError{&GoWrapErrorSelfAddrError{message}}
case 4:
case 3:
return &GoWrapError{&GoWrapErrorSendMsgError{message}}
case 5:
case 4:
return &GoWrapError{&GoWrapErrorReplyError{message}}
case 6:
case 5:
return &GoWrapError{&GoWrapErrorListenError{message}}
case 6:
return &GoWrapError{&GoWrapErrorProxyInitError{message}}
case 7:
return &GoWrapError{&GoWrapErrorProxyRunError{message}}
case 8:
return &GoWrapError{&GoWrapErrorServerInitError{message}}
case 9:
return &GoWrapError{&GoWrapErrorAddressGetterError{message}}
case 10:
return &GoWrapError{&GoWrapErrorServerRunError{message}}
default:
panic(fmt.Sprintf("Unknown error code %d in FfiConverterTypeGoWrapError.Read()", errorID))
}
@@ -701,22 +863,67 @@ func (c FfiConverterTypeGoWrapError) Write(writer io.Writer, value *GoWrapError)
switch variantValue := value.err.(type) {
case *GoWrapErrorClientInitError:
writeInt32(writer, 1)
case *GoWrapErrorClientUninitialisedError:
writeInt32(writer, 2)
case *GoWrapErrorSelfAddrError:
writeInt32(writer, 3)
writeInt32(writer, 2)
case *GoWrapErrorSendMsgError:
writeInt32(writer, 4)
writeInt32(writer, 3)
case *GoWrapErrorReplyError:
writeInt32(writer, 5)
writeInt32(writer, 4)
case *GoWrapErrorListenError:
writeInt32(writer, 5)
case *GoWrapErrorProxyInitError:
writeInt32(writer, 6)
case *GoWrapErrorProxyRunError:
writeInt32(writer, 7)
case *GoWrapErrorServerInitError:
writeInt32(writer, 8)
case *GoWrapErrorAddressGetterError:
writeInt32(writer, 9)
case *GoWrapErrorServerRunError:
writeInt32(writer, 10)
default:
_ = variantValue
panic(fmt.Sprintf("invalid error value `%v` in FfiConverterTypeGoWrapError.Write", value))
}
}
type FfiConverterOptionalString struct{}
var FfiConverterOptionalStringINSTANCE = FfiConverterOptionalString{}
func (c FfiConverterOptionalString) Lift(rb RustBufferI) *string {
return LiftFromRustBuffer[*string](c, rb)
}
func (_ FfiConverterOptionalString) Read(reader io.Reader) *string {
if readInt8(reader) == 0 {
return nil
}
temp := FfiConverterStringINSTANCE.Read(reader)
return &temp
}
func (c FfiConverterOptionalString) Lower(value *string) RustBuffer {
return LowerIntoRustBuffer[*string](c, value)
}
func (_ FfiConverterOptionalString) Write(writer io.Writer, value *string) {
if value == nil {
writeInt8(writer, 0)
} else {
writeInt8(writer, 1)
FfiConverterStringINSTANCE.Write(writer, *value)
}
}
type FfiDestroyerOptionalString struct{}
func (_ FfiDestroyerOptionalString) Destroy(value *string) {
if value != nil {
FfiDestroyerString{}.Destroy(*value)
}
}
func GetSelfAddress() (string, error) {
_uniffiRV, _uniffiErr := rustCallWithError(FfiConverterTypeGoWrapError{}, func(_uniffiStatus *C.RustCallStatus) RustBufferI {
return C.uniffi_nym_go_ffi_fn_func_get_self_address(_uniffiStatus)
@@ -756,6 +963,42 @@ func ListenForIncoming() (IncomingMessage, error) {
}
}
func NewProxyClient(serverAddress string, listenAddress string, listenPort string, closeTimeout uint64, env *string) error {
_, _uniffiErr := rustCallWithError(FfiConverterTypeGoWrapError{}, func(_uniffiStatus *C.RustCallStatus) bool {
C.uniffi_nym_go_ffi_fn_func_new_proxy_client(FfiConverterStringINSTANCE.Lower(serverAddress), FfiConverterStringINSTANCE.Lower(listenAddress), FfiConverterStringINSTANCE.Lower(listenPort), FfiConverterUint64INSTANCE.Lower(closeTimeout), FfiConverterOptionalStringINSTANCE.Lower(env), _uniffiStatus)
return false
})
return _uniffiErr
}
func NewProxyClientDefault(serverAddress string, env *string) error {
_, _uniffiErr := rustCallWithError(FfiConverterTypeGoWrapError{}, func(_uniffiStatus *C.RustCallStatus) bool {
C.uniffi_nym_go_ffi_fn_func_new_proxy_client_default(FfiConverterStringINSTANCE.Lower(serverAddress), FfiConverterOptionalStringINSTANCE.Lower(env), _uniffiStatus)
return false
})
return _uniffiErr
}
func NewProxyServer(upstreamAddress string, configDir string, env *string) error {
_, _uniffiErr := rustCallWithError(FfiConverterTypeGoWrapError{}, func(_uniffiStatus *C.RustCallStatus) bool {
C.uniffi_nym_go_ffi_fn_func_new_proxy_server(FfiConverterStringINSTANCE.Lower(upstreamAddress), FfiConverterStringINSTANCE.Lower(configDir), FfiConverterOptionalStringINSTANCE.Lower(env), _uniffiStatus)
return false
})
return _uniffiErr
}
func ProxyServerAddress() (string, error) {
_uniffiRV, _uniffiErr := rustCallWithError(FfiConverterTypeGoWrapError{}, func(_uniffiStatus *C.RustCallStatus) RustBufferI {
return C.uniffi_nym_go_ffi_fn_func_proxy_server_address(_uniffiStatus)
})
if _uniffiErr != nil {
var _uniffiDefaultValue string
return _uniffiDefaultValue, _uniffiErr
} else {
return FfiConverterStringINSTANCE.Lift(_uniffiRV), _uniffiErr
}
}
func Reply(recipient []byte, message string) error {
_, _uniffiErr := rustCallWithError(FfiConverterTypeGoWrapError{}, func(_uniffiStatus *C.RustCallStatus) bool {
C.uniffi_nym_go_ffi_fn_func_reply(FfiConverterBytesINSTANCE.Lower(recipient), FfiConverterStringINSTANCE.Lower(message), _uniffiStatus)
@@ -764,6 +1007,22 @@ func Reply(recipient []byte, message string) error {
return _uniffiErr
}
func RunProxyClient() error {
_, _uniffiErr := rustCallWithError(FfiConverterTypeGoWrapError{}, func(_uniffiStatus *C.RustCallStatus) bool {
C.uniffi_nym_go_ffi_fn_func_run_proxy_client(_uniffiStatus)
return false
})
return _uniffiErr
}
func RunProxyServer() error {
_, _uniffiErr := rustCallWithError(FfiConverterTypeGoWrapError{}, func(_uniffiStatus *C.RustCallStatus) bool {
C.uniffi_nym_go_ffi_fn_func_run_proxy_server(_uniffiStatus)
return false
})
return _uniffiErr
}
func SendMessage(recipient string, message string) error {
_, _uniffiErr := rustCallWithError(FfiConverterTypeGoWrapError{}, func(_uniffiStatus *C.RustCallStatus) bool {
C.uniffi_nym_go_ffi_fn_func_send_message(FfiConverterStringINSTANCE.Lower(recipient), FfiConverterStringINSTANCE.Lower(message), _uniffiStatus)
+58
View File
@@ -84,12 +84,46 @@ RustBuffer uniffi_nym_go_ffi_fn_func_listen_for_incoming(
RustCallStatus* out_status
);
void uniffi_nym_go_ffi_fn_func_new_proxy_client(
RustBuffer server_address,
RustBuffer listen_address,
RustBuffer listen_port,
uint64_t close_timeout,
RustBuffer env,
RustCallStatus* out_status
);
void uniffi_nym_go_ffi_fn_func_new_proxy_client_default(
RustBuffer server_address,
RustBuffer env,
RustCallStatus* out_status
);
void uniffi_nym_go_ffi_fn_func_new_proxy_server(
RustBuffer upstream_address,
RustBuffer config_dir,
RustBuffer env,
RustCallStatus* out_status
);
RustBuffer uniffi_nym_go_ffi_fn_func_proxy_server_address(
RustCallStatus* out_status
);
void uniffi_nym_go_ffi_fn_func_reply(
RustBuffer recipient,
RustBuffer message,
RustCallStatus* out_status
);
void uniffi_nym_go_ffi_fn_func_run_proxy_client(
RustCallStatus* out_status
);
void uniffi_nym_go_ffi_fn_func_run_proxy_server(
RustCallStatus* out_status
);
void uniffi_nym_go_ffi_fn_func_send_message(
RustBuffer recipient,
RustBuffer message,
@@ -411,10 +445,34 @@ uint16_t uniffi_nym_go_ffi_checksum_func_listen_for_incoming(
RustCallStatus* out_status
);
uint16_t uniffi_nym_go_ffi_checksum_func_new_proxy_client(
RustCallStatus* out_status
);
uint16_t uniffi_nym_go_ffi_checksum_func_new_proxy_client_default(
RustCallStatus* out_status
);
uint16_t uniffi_nym_go_ffi_checksum_func_new_proxy_server(
RustCallStatus* out_status
);
uint16_t uniffi_nym_go_ffi_checksum_func_proxy_server_address(
RustCallStatus* out_status
);
uint16_t uniffi_nym_go_ffi_checksum_func_reply(
RustCallStatus* out_status
);
uint16_t uniffi_nym_go_ffi_checksum_func_run_proxy_client(
RustCallStatus* out_status
);
uint16_t uniffi_nym_go_ffi_checksum_func_run_proxy_server(
RustCallStatus* out_status
);
uint16_t uniffi_nym_go_ffi_checksum_func_send_message(
RustCallStatus* out_status
);
+158
View File
@@ -0,0 +1,158 @@
package main
import (
"bufio"
"fmt"
"net"
"nymffi/go-nym/bindings"
"os"
"time"
)
func runProxyClient() {
run_err := bindings.RunProxyClient()
if run_err != nil {
fmt.Println(run_err)
return
}
}
func runProxyServer() {
run_err := bindings.RunProxyServer()
if run_err != nil {
fmt.Println(run_err)
return
}
}
// connects to the proxy server and listens out for incoming as you would with a normal tcp connection
func startTcpListener() {
ln, err := net.Listen("tcp", ":9000")
if err != nil {
fmt.Println(err)
return
}
for {
conn, err := ln.Accept()
if err != nil {
fmt.Println(err)
continue
}
go handleConnection(conn)
}
}
func handleConnection(conn net.Conn) {
defer conn.Close()
buf := make([]byte, 1024)
_, err := conn.Read(buf)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("Server-side tcp received: %s", buf)
_, err = conn.Write(buf)
if err != nil {
fmt.Println(err)
return
}
}
/*
Flow showing:
- setting up Nym client logging
- creating instances of both the NymProxyClient and NymProxyServer
- running both in goroutines to kick off the process
- starting a vanilla tcp listener/echo in a goroutine connected to the ProxyServer instance
- pinging that via a tcp client and waiting for the reply: this is (under the hood) sent anonymously via SURBs - the ProxyServer and 'server-side' tcp listener never know the Nym address or IP of the ProxyClient/'client-side' tcp client.
*/
func main() {
// our mixnet client config file defining which network to use
var env_path = "../../../envs/canary.env"
// the tcp socket our server communicates with - the remote host your client is trying to hit
var upstreamAddress = "127.0.0.1:9000"
// where the keys and persistent storage for SURBs is located (this path will be prepended with the value of $HOME in the rust lib)
var configDir = "/tmp/go-proxy-server-example"
// tcp socket port our proxy client communicates with
var clientPort = "8080"
// timeout for ephemeral client to shutdown connection after sending Close message enum once it has sent all of the other messages (in seconds): this is used by the ProxyServer for session management
var clientTimeout uint64 = 60
bindings.InitLogging()
// checking loading proper env
file, err := os.Open(env_path)
if err != nil {
fmt.Println("Error opening file:", err)
return
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
fmt.Println(scanner.Text())
}
// init a proxy server
build_serv_err := bindings.NewProxyServer(upstreamAddress, configDir, &env_path)
if build_serv_err != nil {
fmt.Println(build_serv_err)
return
}
// get proxy addr
proxyAddr, get_addr_err := bindings.ProxyServerAddress()
if get_addr_err != nil {
fmt.Println("(Go) Error:", get_addr_err)
return
}
fmt.Println("(Go) server address:")
fmt.Println(proxyAddr)
// run it in a goroutine
go runProxyServer()
// initialise a proxy client
build_err := bindings.NewProxyClient(proxyAddr, "127.0.0.1", clientPort, clientTimeout, &env_path)
if build_err != nil {
fmt.Println(build_err)
return
}
// run it in a goroutine
go runProxyClient()
// connect 'server-side' tcp socket to ProxyServer
go startTcpListener()
// send a oneshot message, wait for the echo, and close. you will see the session uuid info and the fact that the proxy_client logs it will be closing the session in <clientTimeout>.
conn, err := net.Dial("tcp", "localhost:8080")
if err != nil {
fmt.Println(err)
return
}
_, err = conn.Write([]byte("Hello, server: oneshot ping\n"))
if err != nil {
fmt.Println(err)
return
}
buf := make([]byte, 1024)
_, read_err := conn.Read(buf)
if read_err != nil {
fmt.Println(read_err)
return
}
fmt.Printf("Client-side tcp received: %s", buf)
conn.Close()
// sleep so that the nym client processes can catch up - in reality you'd have another process
// running to keep logging going, so this is only necessary for this reference
time.Sleep(60 * time.Second)
fmt.Println("(Go) end go example")
}

Some files were not shown because too many files have changed in this diff Show More