Compare commits

...

28 Commits

Author SHA1 Message Date
benedetta davico 8dbcd7d07c TEMP gh runner fix 2024-12-03 10:28:27 +01:00
Bogdan-Ștefan Neacşu 60c21a8d1d Fix backwards compat mac generation (#5202) 2024-12-02 19:52:59 +02:00
Jędrzej Stuczyński feefde9022 Bugfix/credential proxy sequencing (#5187)
* using common middleware for all http servers

* improved span handling in credential-proxy

* ensure increase in sequence number upon making deposit

* added explicit connect options for the db

* fixed further instances of incorrect span instrumentation

* batch deposit requests together to improve concurrency

* ignore cancelled requests

* updated credential proxy version to 0.1.4

* adjusted Dockerfile with new binary location

* log binary version on startup

* reduce default log level

* guard against unavaiable commit sha

* apply review comments: dont exit(0), instead just shutdown normally

* add skip_webhook parameter to obtain-async

* removing dead code
2024-12-02 14:52:35 +00:00
benedetta davico 645be5fa22 Update ci-build-upload-binaries.yml 2024-12-02 14:03:44 +01:00
benedetta davico ac56717b23 Update ci-build-upload-binaries.yml 2024-12-02 13:48:05 +01:00
benedetta davico ec502f46f0 Merge pull request #5196 from nymtech/release/2024.13-magura-patched-v2
Merging magura drift into crunch
2024-12-02 12:13:12 +01:00
benedettadavico 4a9a5579c4 update changelog 2024-11-29 14:06:32 +01:00
benedetta davico 96180275f8 Update Cargo.toml 2024-11-29 13:57:57 +01:00
Bogdan-Ștefan Neacşu ab20260a2f Guard storage access with cache (#5193)
* Guard storage access with cache

* Do the sync way less freq

* Change sync behaviour for bandwidth too

* Use bigger delta
2024-11-29 14:56:39 +02:00
Jędrzej Stuczyński 889d464e98 improvement: make internal gateway clients use the same topology cache (#5191) 2024-11-29 09:45:12 +00:00
Jędrzej Stuczyński 56206433e6 chore: apply 1.84 linter suggestions (#5192) 2024-11-29 09:20:45 +00:00
Drazen Urch 8e713d43e1 Add monitor_run and testing_route indexes (#5182) 2024-11-27 11:07:39 +01:00
benedettadavico 35aa7e338d bump binary versions 2024-11-26 15:01:06 +01:00
Jędrzej Stuczyński 2a60b2f057 bugfix: fixed nym-node config migrations (again) (#5179) 2024-11-26 09:41:18 +00:00
Jędrzej Stuczyński dcde4c8df1 bugfix: use default value for verloc config when deserialising missing values (#5177) 2024-11-25 17:54:32 +00:00
Tommy Verrall fcaa32284b Merge pull request #5175 from nymtech/fix/empty_allowed
Remove peers with no allowed ip from storage
2024-11-25 17:36:33 +00:00
Bogdan-Ștefan Neacşu fa72f90bfa Remove peers with no allowed ip from storage 2024-11-25 16:44:23 +00:00
Jędrzej Stuczyński 12b9aefa99 bugfix: correctly expose ecash-related data on nym-api (#5155)
* fixed signer related endpoints

* fixed aggregation of partial data if the api is not a signer

* fixes to swagger docs for global ecash endpoints

* remove unused axum_macros

* fixed test traits
2024-11-25 08:39:55 +00:00
benedetta davico 0041937ed3 Merge pull request #5170 from nymtech/merge/release/2024.13-magura-patched
merge patched magura into develop
2024-11-25 09:06:32 +01:00
Jędrzej Stuczyński 0e5bd966dd Merge branch 'release/2024.13-magura-patched' into develop 2024-11-22 15:53:59 +00:00
Jędrzej Stuczyński 6acd936368 bugfix: additional improvements to nym-api db performance (#5168)
* added statement logging to nym-api db

* adding additional pool options

* dont blow up upon failing to submit network monitor results

* store in-memory cache of node database ids
2024-11-22 15:52:26 +00:00
benedettadavico 01c7b2819e update changelog 2024-11-22 10:50:59 +01:00
Simon Wicky 042a8a58aa start session collection for exit gateways (#5148) (#5161) 2024-11-22 09:12:01 +01:00
benedettadavico d8ab2a8f15 update version for clients and nym-node 2024-11-22 07:25:54 +01:00
Jędrzej Stuczyński d6d2239685 implement optional cap for number of ack retransmissions (#5158)
* implement optional cap for number of ack retransmissions

* Wasm
2024-11-21 18:57:24 +00:00
Jędrzej Stuczyński 3d704fbbf1 change: make nym-api optionally ignore nodes with illegal ip addresses, like loopback (#5159) 2024-11-21 18:54:14 +00:00
Jędrzej Stuczyński 119c36b0bb added 'deterministic_route_selection' for sphinx packet route selection (#5157)
* added 'deterministic_route_selection' for sphinx packet route selection

* clippy + wasm

* Switch to ChaCha8Rng for deterministic route generation

---------

Co-authored-by: durch <durch@users.noreply.github.com>
2024-11-21 18:21:01 +00:00
Jędrzej Stuczyński 6b5b97199b fix: tracing logger to output to stderr 2024-11-19 17:47:13 +00:00
134 changed files with 1676 additions and 920 deletions
@@ -21,7 +21,7 @@ jobs:
strategy:
fail-fast: false
matrix:
platform: [ arc-ubuntu-20.04 ]
platform: [ arc-ubuntu-22.04 ]
runs-on: ${{ matrix.platform }}
env:
+10
View File
@@ -4,6 +4,16 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
## [Unreleased]
## [2024.13-magura-drift] (2024-11-29)
- Optimised syncing bandwidth information to storage
## [2024.13-magura-patched] (2024-11-22)
- [experimental] allow clients to change between deterministic route selection based on packet headers and a pseudorandom distribution
- Introduced a configurable limit on retransmission frequency of packets if ACKs are not received
- Filtered out invalid IP addresses on nym-api
## [2024.13-magura] (2024-11-18)
- Limit race probability ([#5145])
Generated
+15 -9
View File
@@ -2428,7 +2428,7 @@ dependencies = [
[[package]]
name = "explorer-api"
version = "1.1.42"
version = "1.1.43"
dependencies = [
"chrono",
"clap 4.5.20",
@@ -4451,7 +4451,7 @@ checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
[[package]]
name = "nym-api"
version = "1.1.46"
version = "1.1.47"
dependencies = [
"anyhow",
"async-trait",
@@ -4470,6 +4470,7 @@ dependencies = [
"cw2",
"cw3",
"cw4",
"dashmap",
"dirs",
"futures",
"getset",
@@ -4700,7 +4701,7 @@ dependencies = [
[[package]]
name = "nym-cli"
version = "1.1.44"
version = "1.1.45"
dependencies = [
"anyhow",
"base64 0.22.1",
@@ -4781,7 +4782,7 @@ dependencies = [
[[package]]
name = "nym-client"
version = "1.1.43"
version = "1.1.45"
dependencies = [
"bs58",
"clap 4.5.20",
@@ -5092,7 +5093,7 @@ dependencies = [
[[package]]
name = "nym-credential-proxy"
version = "0.1.3"
version = "0.1.6"
dependencies = [
"anyhow",
"async-trait",
@@ -5112,6 +5113,7 @@ dependencies = [
"nym-credentials",
"nym-credentials-interface",
"nym-crypto",
"nym-ecash-contract-common",
"nym-http-api-common",
"nym-network-defaults",
"nym-validator-client",
@@ -5619,12 +5621,15 @@ dependencies = [
"axum-client-ip",
"bytes",
"colored",
"futures",
"mime",
"serde",
"serde_json",
"serde_yaml",
"tower 0.4.13",
"tracing",
"utoipa",
"zeroize",
]
[[package]]
@@ -5898,7 +5903,7 @@ dependencies = [
[[package]]
name = "nym-network-requester"
version = "1.1.44"
version = "1.1.46"
dependencies = [
"addr",
"anyhow",
@@ -5949,7 +5954,7 @@ dependencies = [
[[package]]
name = "nym-node"
version = "1.1.10"
version = "1.1.12"
dependencies = [
"anyhow",
"bip39",
@@ -6305,7 +6310,7 @@ dependencies = [
[[package]]
name = "nym-socks5-client"
version = "1.1.43"
version = "1.1.45"
dependencies = [
"bs58",
"clap 4.5.20",
@@ -6885,6 +6890,7 @@ dependencies = [
"nym-task",
"nym-wireguard-types",
"thiserror",
"time",
"tokio",
"tokio-stream",
"x25519-dalek",
@@ -6907,7 +6913,7 @@ dependencies = [
[[package]]
name = "nymvisor"
version = "0.1.9"
version = "0.1.10"
dependencies = [
"anyhow",
"bytes",
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-client"
version = "1.1.43"
version = "1.1.45"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>", "Jędrzej Stuczyński <andrew@nymtech.net>"]
description = "Implementation of the Nym Client"
edition = "2021"
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-socks5-client"
version = "1.1.43"
version = "1.1.45"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
description = "A SOCKS5 localhost proxy that converts incoming messages to Sphinx and sends them to a Nym address"
edition = "2021"
@@ -17,7 +17,7 @@ use nym_validator_client::coconut::all_ecash_api_clients;
use nym_validator_client::nym_api::EpochId;
use nym_validator_client::nyxd::contract_traits::EcashSigningClient;
use nym_validator_client::nyxd::contract_traits::{DkgQueryClient, EcashQueryClient};
use nym_validator_client::nyxd::cosmwasm_client::ToSingletonContractData;
use nym_validator_client::nyxd::cosmwasm_client::ContractResponseData;
use nym_validator_client::EcashApiClient;
use rand::rngs::OsRng;
+10 -1
View File
@@ -393,13 +393,20 @@ pub struct Traffic {
/// poisson distribution.
pub disable_main_poisson_packet_distribution: bool,
/// Specify whether route selection should be determined by the packet header.
pub deterministic_route_selection: bool,
/// Specify how many times particular packet can be retransmitted
/// None - no limit
pub maximum_number_of_retransmissions: Option<u32>,
/// Specifies the packet size used for sent messages.
/// Do not override it unless you understand the consequences of that change.
pub primary_packet_size: PacketSize,
/// Specifies the optional auxiliary packet size for optimizing message streams.
/// Note that its use decreases overall anonymity.
/// Do not set it it unless you understand the consequences of that change.
/// Do not set it unless you understand the consequences of that change.
pub secondary_packet_size: Option<PacketSize>,
pub packet_type: PacketType,
@@ -424,6 +431,8 @@ impl Default for Traffic {
average_packet_delay: DEFAULT_AVERAGE_PACKET_DELAY,
message_sending_average_delay: DEFAULT_MESSAGE_STREAM_AVERAGE_DELAY,
disable_main_poisson_packet_distribution: false,
deterministic_route_selection: false,
maximum_number_of_retransmissions: None,
primary_packet_size: PacketSize::RegularPacket,
secondary_packet_size: None,
packet_type: PacketType::Mix,
@@ -111,6 +111,7 @@ impl From<ConfigV5> for Config {
primary_packet_size: value.debug.traffic.primary_packet_size,
secondary_packet_size: value.debug.traffic.secondary_packet_size,
packet_type: value.debug.traffic.packet_type,
..Default::default()
},
cover_traffic: CoverTraffic {
loop_cover_traffic_average_delay: value
@@ -30,7 +30,8 @@ pub(crate) enum Action {
InsertPending(Vec<PendingAcknowledgement>),
/// Removes given `PendingAcknowledgement` from the 'shared' state. Also cancels the retransmission timer.
/// Initiated by `AcknowledgementListener`
/// Initiated by `AcknowledgementListener` upon receiving the acknowledgement. Also by `RetransmissionRequestListener`
/// upon deciding to abandon the data.
RemovePending(FragmentIdentifier),
/// Starts the retransmission timer on given `PendingAcknowledgement` with the `Duration` based on
@@ -41,7 +42,7 @@ pub(crate) enum Action {
/// Updates the expected delay of given `PendingAcknowledgement` with the new provided `SphinxDelay`.
/// Initiated by `RetransmissionRequestListener`
UpdateDelay(FragmentIdentifier, SphinxDelay),
UpdatePendingAck(FragmentIdentifier, SphinxDelay),
}
impl Action {
@@ -57,8 +58,8 @@ impl Action {
Action::StartTimer(frag_id)
}
pub(crate) fn new_update_delay(frag_id: FragmentIdentifier, delay: SphinxDelay) -> Self {
Action::UpdateDelay(frag_id, delay)
pub(crate) fn new_update_pending_ack(frag_id: FragmentIdentifier, delay: SphinxDelay) -> Self {
Action::UpdatePendingAck(frag_id, delay)
}
}
@@ -135,7 +136,7 @@ impl ActionController {
}
fn handle_start_timer(&mut self, frag_id: FragmentIdentifier) {
trace!("{} is starting its timer", frag_id);
trace!("{frag_id} is starting its timer");
if let Some((pending_ack_data, queue_key)) = self.pending_acks_data.get_mut(&frag_id) {
// the fact that this branch is now POSSIBLE is a sign of a need to refactor this whole
@@ -193,7 +194,7 @@ impl ActionController {
// initiated basically as a first step of retransmission. At first data has its delay updated
// (as new sphinx packet was created with new expected delivery time)
fn handle_update_delay(&mut self, frag_id: FragmentIdentifier, delay: SphinxDelay) {
fn handle_update_pending_ack(&mut self, frag_id: FragmentIdentifier, delay: SphinxDelay) {
trace!("{} is updating its delay", frag_id);
// TODO: is it possible to solve this without either locking or temporarily removing the value?
if let Some((pending_ack_data, queue_key)) = self.pending_acks_data.remove(&frag_id) {
@@ -202,7 +203,7 @@ impl ActionController {
// reference to this Arc. HOWEVER, before the Action was pushed onto the queue, the reference
// was dropped hence this unwrap is safe.
let mut inner_data = Arc::try_unwrap(pending_ack_data).unwrap();
inner_data.update_delay(delay);
inner_data.update_retransmitted(delay);
self.pending_acks_data
.insert(frag_id, (Arc::new(inner_data), queue_key));
@@ -225,7 +226,7 @@ impl ActionController {
// about it. Perhaps just reschedule it at later point?
let frag_id = expired_ack.into_inner();
trace!("{} has expired", frag_id);
trace!("{frag_id} has expired");
if let Some((pending_ack_data, queue_key)) = self.pending_acks_data.get_mut(&frag_id) {
if queue_key.is_none() {
@@ -258,7 +259,9 @@ impl ActionController {
Action::InsertPending(pending_acks) => self.handle_insert(pending_acks),
Action::RemovePending(frag_id) => self.handle_remove(frag_id),
Action::StartTimer(frag_id) => self.handle_start_timer(frag_id),
Action::UpdateDelay(frag_id, delay) => self.handle_update_delay(frag_id, delay),
Action::UpdatePendingAck(frag_id, delay) => {
self.handle_update_pending_ack(frag_id, delay)
}
}
}
@@ -71,6 +71,7 @@ pub(crate) struct PendingAcknowledgement {
delay: SphinxDelay,
destination: PacketDestination,
mix_hops: Option<u8>,
retransmissions: u32,
}
impl PendingAcknowledgement {
@@ -86,6 +87,7 @@ impl PendingAcknowledgement {
delay,
destination: PacketDestination::KnownRecipient(recipient.into()),
mix_hops,
retransmissions: 0,
}
}
@@ -105,6 +107,7 @@ impl PendingAcknowledgement {
// Messages sent using SURBs are using the number of mix hops set by the recipient when
// they provided the SURBs, so it doesn't make sense to include it here.
mix_hops: None,
retransmissions: 0,
}
}
@@ -116,8 +119,9 @@ impl PendingAcknowledgement {
self.message_chunk.clone()
}
fn update_delay(&mut self, new_delay: SphinxDelay) {
fn update_retransmitted(&mut self, new_delay: SphinxDelay) {
self.delay = new_delay;
self.retransmissions += 1;
}
}
@@ -163,6 +167,9 @@ impl AcknowledgementControllerConnectors {
/// Configurable parameters of the `AcknowledgementController`
pub(super) struct Config {
/// Specify how many times particular packet can be retransmitted
maximum_retransmissions: Option<u32>,
/// Given ack timeout in the form a * BASE_DELAY + b, it specifies the additive part `b`
ack_wait_addition: Duration,
@@ -174,8 +181,13 @@ pub(super) struct Config {
}
impl Config {
pub(super) fn new(ack_wait_addition: Duration, ack_wait_multiplier: f64) -> Self {
pub(super) fn new(
maximum_retransmissions: Option<u32>,
ack_wait_addition: Duration,
ack_wait_multiplier: f64,
) -> Self {
Config {
maximum_retransmissions,
ack_wait_addition,
ack_wait_multiplier,
packet_size: Default::default(),
@@ -238,6 +250,7 @@ where
// will listen for any ack timeouts and trigger retransmission
let retransmission_request_listener = RetransmissionRequestListener::new(
config.maximum_retransmissions,
connectors.ack_action_sender.clone(),
message_handler,
retransmission_rx,
@@ -20,6 +20,7 @@ use std::sync::{Arc, Weak};
// responsible for packet retransmission upon fired timer
pub(super) struct RetransmissionRequestListener<R> {
maximum_retransmissions: Option<u32>,
action_sender: AckActionSender,
message_handler: MessageHandler<R>,
request_receiver: RetransmissionRequestReceiver,
@@ -31,12 +32,14 @@ where
R: CryptoRng + Rng,
{
pub(super) fn new(
maximum_retransmissions: Option<u32>,
action_sender: AckActionSender,
message_handler: MessageHandler<R>,
request_receiver: RetransmissionRequestReceiver,
reply_controller_sender: ReplyControllerSender,
) -> Self {
RetransmissionRequestListener {
maximum_retransmissions,
action_sender,
message_handler,
request_receiver,
@@ -77,6 +80,18 @@ where
}
};
let frag_id = timed_out_ack.message_chunk.fragment_identifier();
if let Some(limit) = self.maximum_retransmissions {
if timed_out_ack.retransmissions >= limit {
warn!("reached maximum number of allowed retransmissions for the packet");
self.action_sender
.unbounded_send(Action::new_remove(frag_id))
.unwrap();
return;
}
}
let maybe_prepared_fragment = match &timed_out_ack.destination {
PacketDestination::Anonymous {
recipient_tag,
@@ -101,8 +116,6 @@ where
}
};
let frag_id = timed_out_ack.message_chunk.fragment_identifier();
let prepared_fragment = match maybe_prepared_fragment {
Ok(prepared_fragment) => prepared_fragment,
Err(err) => {
@@ -136,7 +149,7 @@ where
// with the additional poisson delay.
// And since Actions are executed in order `UpdateTimer` will HAVE TO be executed before `StartTimer`
self.action_sender
.unbounded_send(Action::new_update_delay(frag_id, new_delay))
.unbounded_send(Action::new_update_pending_ack(frag_id, new_delay))
.unwrap();
// send to `OutQueueControl` to eventually send to the mix network
@@ -91,6 +91,9 @@ pub(crate) struct Config {
/// and surb-based are going to be sent.
sender_address: Recipient,
/// Specify whether route selection should be determined by the packet header.
deterministic_route_selection: bool,
/// Average delay a data packet is going to get delay at a single mixnode.
average_packet_delay: Duration,
@@ -114,10 +117,12 @@ impl Config {
sender_address: Recipient,
average_packet_delay: Duration,
average_ack_delay: Duration,
deterministic_route_selection: bool,
) -> Self {
Config {
ack_key,
sender_address,
deterministic_route_selection,
average_packet_delay,
average_ack_delay,
num_mix_hops: DEFAULT_NUM_MIX_HOPS,
@@ -176,6 +181,7 @@ where
{
let message_preparer = MessagePreparer::new(
rng,
config.deterministic_route_selection,
config.sender_address,
config.average_packet_delay,
config.average_ack_delay,
@@ -634,7 +640,7 @@ where
pub(crate) fn update_ack_delay(&self, id: FragmentIdentifier, new_delay: Delay) {
self.action_sender
.unbounded_send(Action::UpdateDelay(id, new_delay))
.unbounded_send(Action::UpdatePendingAck(id, new_delay))
.expect("action control task has died")
}
@@ -65,6 +65,7 @@ pub struct Config {
impl<'a> From<&'a Config> for acknowledgement_control::Config {
fn from(cfg: &'a Config) -> Self {
acknowledgement_control::Config::new(
cfg.traffic.maximum_number_of_retransmissions,
cfg.acks.ack_wait_addition,
cfg.acks.ack_wait_multiplier,
)
@@ -97,6 +98,7 @@ impl<'a> From<&'a Config> for message_handler::Config {
cfg.self_recipient,
cfg.traffic.average_packet_delay,
cfg.acks.average_ack_delay,
cfg.traffic.deterministic_route_selection,
)
.with_custom_primary_packet_size(cfg.traffic.primary_packet_size)
.with_custom_secondary_packet_size(cfg.traffic.secondary_packet_size)
@@ -38,7 +38,7 @@ pub struct TopologyReadPermit<'a> {
permit: RwLockReadGuard<'a, Option<NymTopology>>,
}
impl<'a> Deref for TopologyReadPermit<'a> {
impl Deref for TopologyReadPermit<'_> {
type Target = Option<NymTopology>;
fn deref(&self) -> &Self::Target {
@@ -32,7 +32,7 @@ impl Div<GasPrice> for Coin {
}
}
impl<'a> Div<GasPrice> for &'a Coin {
impl Div<GasPrice> for &Coin {
type Output = Gas;
fn div(self, rhs: GasPrice) -> Self::Output {
@@ -13,6 +13,44 @@ use tracing::error;
pub use cosmrs::abci::MsgResponse;
pub fn parse_singleton_u32_from_contract_response(b: Vec<u8>) -> Result<u32, NyxdError> {
if b.len() != 4 {
return Err(NyxdError::MalformedResponseData {
got: b.len(),
expected: 4,
});
}
Ok(u32::from_be_bytes([b[0], b[1], b[2], b[3]]))
}
pub fn parse_singleton_u64_from_contract_response(b: Vec<u8>) -> Result<u64, NyxdError> {
if b.len() != 8 {
return Err(NyxdError::MalformedResponseData {
got: b.len(),
expected: 8,
});
}
Ok(u64::from_be_bytes([
b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7],
]))
}
#[derive(Debug, Clone)]
pub struct ParsedContractResponse {
pub message_index: usize,
pub response: Vec<u8>,
}
impl ParsedContractResponse {
pub fn parse_singleton_u32_contract_data(self) -> Result<u32, NyxdError> {
parse_singleton_u32_from_contract_response(self.response)
}
pub fn parse_singleton_u64_contract_data(self) -> Result<u64, NyxdError> {
parse_singleton_u64_from_contract_response(self.response)
}
}
pub fn parse_msg_responses(data: Bytes) -> Vec<MsgResponse> {
// it seems that currently, on wasmd 0.43 + tendermint-rs 0.37 + cosmrs 0.17.0-pre
// the data is left in undecoded base64 form, but I'd imagine this might change so if the decoding fails,
@@ -34,35 +72,25 @@ pub fn parse_msg_responses(data: Bytes) -> Vec<MsgResponse> {
}
// requires there's a single response message
pub trait ToSingletonContractData: Sized {
pub trait ContractResponseData: Sized {
fn parse_singleton_u32_contract_data(&self) -> Result<u32, NyxdError> {
let b = self.to_singleton_contract_data()?;
if b.len() != 4 {
return Err(NyxdError::MalformedResponseData {
got: b.len(),
expected: 4,
});
}
Ok(u32::from_be_bytes([b[0], b[1], b[2], b[3]]))
parse_singleton_u32_from_contract_response(b)
}
fn parse_singleton_u64_contract_data(&self) -> Result<u64, NyxdError> {
let b = self.to_singleton_contract_data()?;
if b.len() != 8 {
return Err(NyxdError::MalformedResponseData {
got: b.len(),
expected: 8,
});
}
Ok(u64::from_be_bytes([
b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7],
]))
parse_singleton_u64_from_contract_response(b)
}
fn to_singleton_contract_data(&self) -> Result<Vec<u8>, NyxdError>;
fn to_unchecked_contract_data(&self) -> Result<Vec<Vec<u8>>, NyxdError>;
fn to_contract_data(&self) -> Result<Vec<ParsedContractResponse>, NyxdError>;
}
impl ToSingletonContractData for ExecuteResult {
impl ContractResponseData for ExecuteResult {
fn to_singleton_contract_data(&self) -> Result<Vec<u8>, NyxdError> {
if self.msg_responses.len() != 1 {
return Err(NyxdError::UnexpectedNumberOfMsgResponses {
@@ -72,6 +100,30 @@ impl ToSingletonContractData for ExecuteResult {
self.msg_responses[0].to_contract_response_data()
}
fn to_unchecked_contract_data(&self) -> Result<Vec<Vec<u8>>, NyxdError> {
self.msg_responses
.iter()
.map(ToContractResponseData::to_contract_response_data)
.collect()
}
fn to_contract_data(&self) -> Result<Vec<ParsedContractResponse>, NyxdError> {
let mut response = Vec::new();
for (message_index, msg) in self.msg_responses.iter().enumerate() {
// unfortunately `Name` trait has not been derived for `MsgExecuteContractResponse`,
// so we have to make an explicit string comparison instead
if msg.type_url == "/cosmwasm.wasm.v1.MsgExecuteContractResponse" {
response.push(ParsedContractResponse {
message_index,
response: msg.to_contract_response_data()?,
})
}
}
Ok(response)
}
}
pub trait ToContractResponseData: Sized {
@@ -23,7 +23,7 @@ use tendermint_rpc::endpoint::*;
use tendermint_rpc::query::Query;
use tendermint_rpc::{Error as TendermintRpcError, Order, Paging, SimpleRequest};
pub use helpers::{ToContractResponseData, ToSingletonContractData};
pub use helpers::{ContractResponseData, ToContractResponseData};
#[cfg(feature = "http-client")]
use crate::http_client;
@@ -22,7 +22,7 @@ pub struct GasPrice {
pub denom: String,
}
impl<'a> Mul<Gas> for &'a GasPrice {
impl Mul<Gas> for &GasPrice {
type Output = Coin;
fn mul(self, gas_limit: Gas) -> Self::Output {
@@ -32,7 +32,7 @@ pub(crate) mod string_rfc3339_offset_date_time {
struct Rfc3339OffsetDateTimeVisitor;
impl<'de> Visitor<'de> for Rfc3339OffsetDateTimeVisitor {
impl Visitor<'_> for Rfc3339OffsetDateTimeVisitor {
type Value = OffsetDateTime;
fn expecting(&self, formatter: &mut Formatter<'_>) -> std::fmt::Result {
@@ -111,7 +111,7 @@ impl<S: Storage + Clone + 'static> BandwidthStorageManager<S> {
}
#[instrument(level = "trace", skip_all)]
async fn sync_storage_bandwidth(&mut self) -> Result<()> {
pub async fn sync_storage_bandwidth(&mut self) -> Result<()> {
trace!("syncing client bandwidth with the underlying storage");
let updated = self
.storage
@@ -8,8 +8,8 @@ use std::time::Duration;
use time::OffsetDateTime;
use tokio::sync::RwLock;
const DEFAULT_CLIENT_BANDWIDTH_MAX_FLUSHING_RATE: Duration = Duration::from_millis(5);
const DEFAULT_CLIENT_BANDWIDTH_MAX_DELTA_FLUSHING_AMOUNT: i64 = 512 * 1024; // 512kB
const DEFAULT_CLIENT_BANDWIDTH_MAX_FLUSHING_RATE: Duration = Duration::from_secs(5 * 60); // 5 minutes
const DEFAULT_CLIENT_BANDWIDTH_MAX_DELTA_FLUSHING_AMOUNT: i64 = 5 * 1024 * 1024; // 5MB
#[derive(Debug, Clone, Copy)]
pub struct BandwidthFlushingBehaviourConfig {
@@ -18,7 +18,7 @@ use nym_validator_client::nym_api::EpochId;
use nym_validator_client::nyxd::contract_traits::{
EcashSigningClient, MultisigQueryClient, MultisigSigningClient, PagedMultisigQueryClient,
};
use nym_validator_client::nyxd::cosmwasm_client::ToSingletonContractData;
use nym_validator_client::nyxd::cosmwasm_client::ContractResponseData;
use nym_validator_client::nyxd::cw3::Status;
use nym_validator_client::nyxd::AccountId;
use nym_validator_client::EcashApiClient;
+1 -2
View File
@@ -26,9 +26,8 @@ const PARALLEL_RUNS: usize = 32;
/// `lambda` ($\lambda$) in the DKG paper
const SECURITY_PARAMETER: usize = 256;
// note: ceiling in integer division can be achieved via q = (x + y - 1) / y;
/// ceil(SECURITY_PARAMETER / PARALLEL_RUNS) in the paper
const NUM_CHALLENGE_BITS: usize = (SECURITY_PARAMETER + PARALLEL_RUNS - 1) / PARALLEL_RUNS;
const NUM_CHALLENGE_BITS: usize = SECURITY_PARAMETER.div_ceil(PARALLEL_RUNS);
// type alias for ease of use
type FirstChallenge = Vec<Vec<Vec<u64>>>;
+3 -3
View File
@@ -196,7 +196,7 @@ impl<'b> Add<&'b Polynomial> for Polynomial {
}
}
impl<'a> Add<Polynomial> for &'a Polynomial {
impl Add<Polynomial> for &Polynomial {
type Output = Polynomial;
fn add(self, rhs: Polynomial) -> Polynomial {
@@ -212,10 +212,10 @@ impl Add<Polynomial> for Polynomial {
}
}
impl<'a, 'b> Add<&'b Polynomial> for &'a Polynomial {
impl<'a> Add<&'a Polynomial> for &Polynomial {
type Output = Polynomial;
fn add(self, rhs: &'b Polynomial) -> Self::Output {
fn add(self, rhs: &'a Polynomial) -> Self::Output {
let len = self.coefficients.len();
let rhs_len = rhs.coefficients.len();
@@ -37,7 +37,7 @@ pub struct GatewayHandshake<'a> {
handshake_future: BoxFuture<'a, Result<SharedGatewayKey, HandshakeError>>,
}
impl<'a> Future for GatewayHandshake<'a> {
impl Future for GatewayHandshake<'_> {
type Output = Result<SharedGatewayKey, HandshakeError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+3
View File
@@ -15,12 +15,15 @@ axum-client-ip.workspace = true
axum.workspace = true
bytes = { workspace = true }
colored.workspace = true
futures = { workspace = true }
mime = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
serde_yaml = { workspace = true }
tower = { workspace = true }
tracing.workspace = true
utoipa = { workspace = true, optional = true }
zeroize = { workspace = true }
[features]
utoipa = ["dep:utoipa"]
+1 -1
View File
@@ -7,7 +7,7 @@ use axum::Json;
use bytes::{BufMut, BytesMut};
use serde::{Deserialize, Serialize};
pub mod logging;
pub mod middleware;
#[derive(Debug, Clone)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
@@ -1,5 +1,5 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
// SPDX-License-Identifier: Apache-2.0
use axum::http::{header, HeaderValue, StatusCode};
use axum::response::IntoResponse;
@@ -1,5 +1,5 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
// SPDX-License-Identifier: Apache-2.0
use axum::extract::Request;
use axum::http::header::{HOST, USER_AGENT};
@@ -11,6 +11,7 @@ use colored::Colorize;
use std::time::Instant;
use tracing::info;
/// Simple logger for requests
pub async fn logger(
InsecureClientIp(addr): InsecureClientIp,
request: Request,
@@ -0,0 +1,5 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod bearer_auth;
pub mod logging;
+14 -4
View File
@@ -29,6 +29,9 @@ pub struct NodeTester<R> {
packet_size: PacketSize,
/// Specify whether route selection should be determined by the packet header.
deterministic_route_selection: bool,
/// Average delay a data packet is going to get delay at a single mixnode.
average_packet_delay: Duration,
@@ -48,11 +51,13 @@ impl<R> NodeTester<R>
where
R: Rng + CryptoRng,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
rng: R,
base_topology: NymTopology,
self_address: Option<Recipient>,
packet_size: PacketSize,
deterministic_route_selection: bool,
average_packet_delay: Duration,
average_ack_delay: Duration,
ack_key: Arc<AckKey>,
@@ -62,6 +67,7 @@ where
base_topology,
self_address,
packet_size,
deterministic_route_selection,
average_packet_delay,
average_ack_delay,
num_mix_hops: DEFAULT_NUM_MIX_HOPS,
@@ -289,10 +295,18 @@ where
impl<R: CryptoRng + Rng> FragmentPreparer for NodeTester<R> {
type Rng = R;
fn deterministic_route_selection(&self) -> bool {
self.deterministic_route_selection
}
fn rng(&mut self) -> &mut Self::Rng {
&mut self.rng
}
fn nonce(&self) -> i32 {
1
}
fn num_mix_hops(&self) -> u8 {
self.num_mix_hops
}
@@ -304,8 +318,4 @@ impl<R: CryptoRng + Rng> FragmentPreparer for NodeTester<R> {
fn average_ack_delay(&self) -> Duration {
self.average_ack_delay
}
fn nonce(&self) -> i32 {
1
}
}
@@ -324,18 +324,6 @@ pub fn unchecked_aggregate_indices_signatures(
_aggregate_indices_signatures(params, vk, signatures_shares, false)
}
/// Generates parameters for the scheme setup.
///
/// # Arguments
///
/// * `total_coins` - it is the number of coins in a freshly generated wallet. It is the public parameter of the scheme.
///
/// # Returns
///
/// A `Parameters` struct containing group parameters, public key, the number of signatures (`total_coins`),
/// and a map of signatures for each index `l`.
///
#[cfg(test)]
mod tests {
use super::*;
@@ -264,7 +264,7 @@ impl<'b> Add<&'b VerificationKeyAuth> for VerificationKeyAuth {
}
}
impl<'a> Mul<Scalar> for &'a VerificationKeyAuth {
impl Mul<Scalar> for &VerificationKeyAuth {
type Output = VerificationKeyAuth;
#[inline]
@@ -984,7 +984,7 @@ pub struct SerialNumberRef<'a> {
pub(crate) inner: &'a [G1Projective],
}
impl<'a> SerialNumberRef<'a> {
impl SerialNumberRef<'_> {
pub fn to_bytes(&self) -> Vec<u8> {
let ss_len = self.inner.len();
let mut bytes: Vec<u8> = Vec::with_capacity(ss_len * 48);
+2 -2
View File
@@ -206,10 +206,10 @@ impl Deref for PublicKey {
}
}
impl<'a, 'b> Mul<&'b Scalar> for &'a PublicKey {
impl<'a> Mul<&'a Scalar> for &PublicKey {
type Output = G1Projective;
fn mul(self, rhs: &'b Scalar) -> Self::Output {
fn mul(self, rhs: &'a Scalar) -> Self::Output {
self.0 * rhs
}
}
+1 -1
View File
@@ -305,7 +305,7 @@ impl<'b> Add<&'b VerificationKey> for VerificationKey {
}
}
impl<'a> Mul<Scalar> for &'a VerificationKey {
impl Mul<Scalar> for &VerificationKey {
type Output = VerificationKey;
#[inline]
+1 -1
View File
@@ -64,7 +64,7 @@ impl<'de> Deserialize<'de> for Recipient {
{
struct RecipientVisitor;
impl<'de> Visitor<'de> for RecipientVisitor {
impl Visitor<'_> for RecipientVisitor {
type Value = Recipient;
fn expecting(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
+6 -7
View File
@@ -1,6 +1,12 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! Encodoing and decoding node routing information.
//!
//! This module is responsible for encoding and decoding node routing information, so that
//! they could be later put into an appropriate field in a sphinx header.
//! Currently, that routing information is an IP address, but in principle it can be anything
//! for as long as it's going to fit in the field.
use nym_crypto::asymmetric::identity;
use nym_sphinx_types::{NodeAddressBytes, NODE_ADDRESS_LENGTH};
@@ -12,13 +18,6 @@ use thiserror::Error;
pub type NodeIdentity = identity::PublicKey;
pub const NODE_IDENTITY_SIZE: usize = identity::PUBLIC_KEY_LENGTH;
/// Encodoing and decoding node routing information.
///
/// This module is responsible for encoding and decoding node routing information, so that
/// they could be later put into an appropriate field in a sphinx header.
/// Currently, that routing information is an IP address, but in principle it can be anything
/// for as long as it's going to fit in the field.
/// MAX_UNPADDED_LEN represents maximum length an unpadded address could have.
/// In this case it's an ipv6 socket address (with version prefix)
pub const MAX_NODE_ADDRESS_UNPADDED_LEN: usize = 19;
@@ -56,7 +56,7 @@ impl<'de> Deserialize<'de> for ReplySurb {
{
struct ReplySurbVisitor;
impl<'de> Visitor<'de> for ReplySurbVisitor {
impl Visitor<'_> for ReplySurbVisitor {
type Value = ReplySurb;
fn expecting(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
+28 -10
View File
@@ -18,7 +18,7 @@ use nym_sphinx_params::{PacketType, ReplySurbKeyDigestAlgorithm, DEFAULT_NUM_MIX
use nym_sphinx_types::{Delay, NymPacket};
use nym_topology::{NymTopology, NymTopologyError};
use rand::{CryptoRng, Rng, SeedableRng};
use rand_chacha::ChaCha20Rng;
use rand_chacha::ChaCha8Rng;
use nym_sphinx_chunking::monitoring;
use std::time::Duration;
@@ -51,6 +51,7 @@ impl From<PreparedFragment> for MixPacket {
pub trait FragmentPreparer {
type Rng: CryptoRng + Rng;
fn deterministic_route_selection(&self) -> bool;
fn rng(&mut self) -> &mut Self::Rng;
fn nonce(&self) -> i32;
fn num_mix_hops(&self) -> u8;
@@ -201,9 +202,7 @@ pub trait FragmentPreparer {
// could perform diffie-hellman with its own keys followed by a kdf to re-derive
// the packet encryption key
let seed = fragment.seed().wrapping_mul(self.nonce());
let mut rng = ChaCha20Rng::seed_from_u64(seed as u64);
let fragment_header = fragment.header();
let destination = packet_recipient.gateway();
let hops = mix_hops.unwrap_or(self.num_mix_hops());
monitoring::fragment_sent(&fragment, self.nonce(), *destination, hops);
@@ -241,8 +240,18 @@ pub trait FragmentPreparer {
};
// generate pseudorandom route for the packet
log::trace!("Preparing chunk for sending with {} mix hops", hops);
let route = topology.random_route_to_gateway(&mut rng, hops, destination)?;
log::trace!("Preparing chunk for sending with {hops} mix hops");
let route = if self.deterministic_route_selection() {
log::trace!("using deterministic route selection");
let seed = fragment_header.seed().wrapping_mul(self.nonce());
let mut rng = ChaCha8Rng::seed_from_u64(seed as u64);
topology.random_route_to_gateway(&mut rng, hops, destination)?
} else {
log::trace!("using pseudorandom route selection");
let mut rng = self.rng();
topology.random_route_to_gateway(&mut rng, hops, destination)?
};
let destination = packet_recipient.as_sphinx_destination();
// including set of delays
@@ -313,6 +322,9 @@ pub struct MessagePreparer<R> {
/// Instance of a cryptographically secure random number generator.
rng: R,
/// Specify whether route selection should be determined by the packet header.
deterministic_route_selection: bool,
/// Address of this client which also represent an address to which all acknowledgements
/// and surb-based are going to be sent.
sender_address: Recipient,
@@ -336,6 +348,7 @@ where
{
pub fn new(
rng: R,
deterministic_route_selection: bool,
sender_address: Recipient,
average_packet_delay: Duration,
average_ack_delay: Duration,
@@ -344,6 +357,7 @@ where
let nonce = rng.gen();
MessagePreparer {
rng,
deterministic_route_selection,
sender_address,
average_packet_delay,
average_ack_delay,
@@ -457,10 +471,18 @@ where
impl<R: CryptoRng + Rng> FragmentPreparer for MessagePreparer<R> {
type Rng = R;
fn deterministic_route_selection(&self) -> bool {
self.deterministic_route_selection
}
fn rng(&mut self) -> &mut Self::Rng {
&mut self.rng
}
fn nonce(&self) -> i32 {
self.nonce
}
fn num_mix_hops(&self) -> u8 {
self.num_mix_hops
}
@@ -472,10 +494,6 @@ impl<R: CryptoRng + Rng> FragmentPreparer for MessagePreparer<R> {
fn average_ack_delay(&self) -> Duration {
self.average_ack_delay
}
fn nonce(&self) -> i32 {
self.nonce
}
}
/*
-10
View File
@@ -253,25 +253,15 @@ impl Socks5RequestContent {
/// Deserialize the request type, connection id, destination address and port,
/// and the request body from bytes.
///
// TODO: this was already inaccurate
// /// Serialized bytes looks like this:
// ///
// /// --------------------------------------------------------------------------------------
// /// request_flag | connection_id | address_length | remote_address_bytes | request_data |
// /// 1 | 8 | 2 | address_length | ... |
// /// --------------------------------------------------------------------------------------
///
/// The request_flag tells us whether this is a new connection request (`new_connect`),
/// an already-established connection we should send up (`new_send`), or
/// a request to close an established connection (`new_close`).
// connect:
// RequestFlag::Connect || CONN_ID || ADDR_LEN || ADDR || <RETURN_ADDR>
//
// send:
// RequestFlag::Send || CONN_ID || LOCAL_CLOSED || DATA
// where DATA: SEQ || TRUE_DATA
pub fn try_from_bytes(b: &[u8]) -> Result<Socks5RequestContent, RequestDeserializationError> {
// each request needs to at least contain flag and ConnectionId
if b.is_empty() {
+11
View File
@@ -162,6 +162,13 @@ pub struct TrafficWasm {
/// a loop cover message is sent instead in order to preserve the rate.
pub message_sending_average_delay_ms: u32,
/// Specify how many times particular packet can be retransmitted
/// None - no limit
pub maximum_number_of_retransmissions: Option<u32>,
/// Specify whether route selection should be determined by the packet header.
pub deterministic_route_selection: bool,
/// Controls whether the main packet stream constantly produces packets according to the predefined
/// poisson distribution.
pub disable_main_poisson_packet_distribution: bool,
@@ -196,6 +203,8 @@ impl From<TrafficWasm> for ConfigTraffic {
message_sending_average_delay: Duration::from_millis(
traffic.message_sending_average_delay_ms as u64,
),
deterministic_route_selection: traffic.deterministic_route_selection,
maximum_number_of_retransmissions: traffic.maximum_number_of_retransmissions,
disable_main_poisson_packet_distribution: traffic
.disable_main_poisson_packet_distribution,
primary_packet_size: PacketSize::RegularPacket,
@@ -211,6 +220,8 @@ impl From<ConfigTraffic> for TrafficWasm {
average_packet_delay_ms: traffic.average_packet_delay.as_millis() as u32,
message_sending_average_delay_ms: traffic.message_sending_average_delay.as_millis()
as u32,
deterministic_route_selection: traffic.deterministic_route_selection,
maximum_number_of_retransmissions: traffic.maximum_number_of_retransmissions,
disable_main_poisson_packet_distribution: traffic
.disable_main_poisson_packet_distribution,
use_extended_packet_size: traffic.secondary_packet_size.is_some(),
@@ -88,6 +88,14 @@ pub struct TrafficWasmOverride {
#[tsify(optional)]
pub message_sending_average_delay_ms: Option<u32>,
/// Specify how many times particular packet can be retransmitted
#[tsify(optional)]
pub maximum_number_of_retransmissions: Option<u32>,
/// Specify whether route selection should be determined by the packet header.
#[tsify(optional)]
pub deterministic_route_selection: Option<bool>,
/// Controls whether the main packet stream constantly produces packets according to the predefined
/// poisson distribution.
#[tsify(optional)]
@@ -113,6 +121,10 @@ impl From<TrafficWasmOverride> for TrafficWasm {
message_sending_average_delay_ms: value
.message_sending_average_delay_ms
.unwrap_or(def.message_sending_average_delay_ms),
maximum_number_of_retransmissions: value.maximum_number_of_retransmissions,
deterministic_route_selection: value
.deterministic_route_selection
.unwrap_or(def.deterministic_route_selection),
disable_main_poisson_packet_distribution: value
.disable_main_poisson_packet_distribution
.unwrap_or(def.disable_main_poisson_packet_distribution),
+1
View File
@@ -26,6 +26,7 @@ log.workspace = true
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "net", "io-util"] }
tokio-stream = { workspace = true }
time = { workspace = true }
nym-authenticator-requests = { path = "../authenticator-requests" }
nym-credential-verification = { path = "../credential-verification" }
+2 -1
View File
@@ -20,6 +20,7 @@ use tokio::sync::mpsc::{self, Receiver, Sender};
pub(crate) mod error;
pub mod peer_controller;
pub mod peer_handle;
pub mod peer_storage_manager;
pub struct WgApiWrapper {
inner: WGApi,
@@ -118,7 +119,7 @@ pub async fn start_wireguard<St: nym_gateway_storage::Storage + Clone + 'static>
storage
.insert_wireguard_peer(peer, bandwidth_manager.is_some())
.await?;
peer_bandwidth_managers.insert(peer.public_key.clone(), bandwidth_manager);
peer_bandwidth_managers.insert(peer.public_key.clone(), (bandwidth_manager, peer.clone()));
}
wg_api.create_interface()?;
let interface_config = InterfaceConfiguration {
+19 -5
View File
@@ -20,9 +20,9 @@ use std::{collections::HashMap, sync::Arc};
use tokio::sync::{mpsc, RwLock};
use tokio_stream::{wrappers::IntervalStream, StreamExt};
use crate::peer_handle::PeerHandle;
use crate::WgApiWrapper;
use crate::{error::Error, peer_handle::SharedBandwidthStorageManager};
use crate::{peer_handle::PeerHandle, peer_storage_manager::PeerStorageManager};
pub enum PeerControlRequest {
AddPeer {
@@ -79,7 +79,7 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
storage: St,
wg_api: Arc<WgApiWrapper>,
initial_host_information: Host,
bw_storage_managers: HashMap<Key, Option<SharedBandwidthStorageManager<St>>>,
bw_storage_managers: HashMap<Key, (Option<SharedBandwidthStorageManager<St>>, Peer)>,
request_tx: mpsc::Sender<PeerControlRequest>,
request_rx: mpsc::Receiver<PeerControlRequest>,
task_client: nym_task::TaskClient,
@@ -88,11 +88,16 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
tokio::time::interval(DEFAULT_PEER_TIMEOUT_CHECK),
);
let host_information = Arc::new(RwLock::new(initial_host_information));
for (public_key, bandwidth_storage_manager) in bw_storage_managers.iter() {
let mut handle = PeerHandle::new(
for (public_key, (bandwidth_storage_manager, peer)) in bw_storage_managers.iter() {
let peer_storage_manager = PeerStorageManager::new(
storage.clone(),
peer.clone(),
bandwidth_storage_manager.is_some(),
);
let mut handle = PeerHandle::new(
public_key.clone(),
host_information.clone(),
peer_storage_manager,
bandwidth_storage_manager.clone(),
request_tx.clone(),
&task_client,
@@ -103,6 +108,10 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
}
});
}
let bw_storage_managers = bw_storage_managers
.into_iter()
.map(|(k, (m, _))| (k, m))
.collect();
PeerController {
storage,
@@ -184,10 +193,15 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
Self::generate_bandwidth_manager(self.storage.clone(), &peer.public_key)
.await?
.map(|bw_m| Arc::new(RwLock::new(bw_m)));
let mut handle = PeerHandle::new(
let peer_storage_manager = PeerStorageManager::new(
self.storage.clone(),
peer.clone(),
bandwidth_storage_manager.is_some(),
);
let mut handle = PeerHandle::new(
peer.public_key.clone(),
self.host_information.clone(),
peer_storage_manager,
bandwidth_storage_manager.clone(),
self.request_tx.clone(),
&self.task_client,
+20 -10
View File
@@ -3,6 +3,7 @@
use crate::error::Error;
use crate::peer_controller::PeerControlRequest;
use crate::peer_storage_manager::PeerStorageManager;
use defguard_wireguard_rs::host::Peer;
use defguard_wireguard_rs::{host::Host, key::Key};
use futures::channel::oneshot;
@@ -21,9 +22,9 @@ pub(crate) type SharedBandwidthStorageManager<St> = Arc<RwLock<BandwidthStorageM
const AUTO_REMOVE_AFTER: Duration = Duration::from_secs(60 * 60 * 24 * 30); // 30 days
pub struct PeerHandle<St> {
storage: St,
public_key: Key,
host_information: Arc<RwLock<Host>>,
peer_storage_manager: PeerStorageManager<St>,
bandwidth_storage_manager: Option<SharedBandwidthStorageManager<St>>,
request_tx: mpsc::Sender<PeerControlRequest>,
timeout_check_interval: IntervalStream,
@@ -33,9 +34,9 @@ pub struct PeerHandle<St> {
impl<St: Storage + Clone + 'static> PeerHandle<St> {
pub fn new(
storage: St,
public_key: Key,
host_information: Arc<RwLock<Host>>,
peer_storage_manager: PeerStorageManager<St>,
bandwidth_storage_manager: Option<SharedBandwidthStorageManager<St>>,
request_tx: mpsc::Sender<PeerControlRequest>,
task_client: &TaskClient,
@@ -46,9 +47,9 @@ impl<St: Storage + Clone + 'static> PeerHandle<St> {
let mut task_client = task_client.fork(format!("peer-{public_key}"));
task_client.disarm();
PeerHandle {
storage,
public_key,
host_information,
peer_storage_manager,
bandwidth_storage_manager,
request_tx,
timeout_check_interval,
@@ -84,16 +85,19 @@ impl<St: Storage + Clone + 'static> PeerHandle<St> {
.ok_or(Error::InconsistentConsumedBytes)?
.try_into()
.map_err(|_| Error::InconsistentConsumedBytes)?;
if spent_bandwidth > 0
&& bandwidth_manager
if spent_bandwidth > 0 {
self.peer_storage_manager.update_trx(kernel_peer);
if bandwidth_manager
.write()
.await
.try_use_bandwidth(spent_bandwidth)
.await
.is_err()
{
let success = self.remove_peer().await?;
return Ok(!success);
{
let success = self.remove_peer().await?;
self.peer_storage_manager.remove_peer();
return Ok(!success);
}
}
} else {
if SystemTime::now().duration_since(self.startup_timestamp)? >= AUTO_REMOVE_AFTER {
@@ -132,7 +136,7 @@ impl<St: Storage + Clone + 'static> PeerHandle<St> {
// the host information hasn't beed updated yet
continue;
};
let Some(storage_peer) = self.storage.get_wireguard_peer(&self.public_key.to_string()).await? else {
let Some(storage_peer) = self.peer_storage_manager.get_peer() else {
log::debug!("Peer {:?} not in storage anymore, shutting down handle", self.public_key);
return Ok(());
};
@@ -141,12 +145,18 @@ impl<St: Storage + Clone + 'static> PeerHandle<St> {
return Ok(());
} else {
// Update storage values
self.storage.insert_wireguard_peer(&kernel_peer, self.bandwidth_storage_manager.is_some()).await?;
self.peer_storage_manager.sync_storage_peer().await?;
}
}
_ = self.task_client.recv() => {
log::trace!("PeerHandle: Received shutdown");
if let Some(bandwidth_manager) = &self.bandwidth_storage_manager {
if let Err(e) = bandwidth_manager.write().await.sync_storage_bandwidth().await {
log::error!("Storage sync failed - {e}, unaccounted bandwidth might have been consumed");
}
}
log::trace!("PeerHandle: Finished shutdown");
}
}
}
@@ -0,0 +1,138 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::Error;
use defguard_wireguard_rs::host::Peer;
use nym_gateway_storage::models::WireguardPeer;
use nym_gateway_storage::Storage;
use std::time::Duration;
use time::OffsetDateTime;
const DEFAULT_PEER_MAX_FLUSHING_RATE: Duration = Duration::from_secs(60 * 60 * 24); // 24h
const DEFAULT_PEER_MAX_DELTA_FLUSHING_AMOUNT: u64 = 512 * 1024 * 1024; // 512MB
#[derive(Debug, Clone, Copy)]
pub struct PeerFlushingBehaviourConfig {
/// Defines maximum delay between peer information being flushed to the persistent storage.
pub peer_max_flushing_rate: Duration,
/// Defines a maximum change in peer before it gets flushed to the persistent storage.
pub peer_max_delta_flushing_amount: u64,
}
impl Default for PeerFlushingBehaviourConfig {
fn default() -> Self {
Self {
peer_max_flushing_rate: DEFAULT_PEER_MAX_FLUSHING_RATE,
peer_max_delta_flushing_amount: DEFAULT_PEER_MAX_DELTA_FLUSHING_AMOUNT,
}
}
}
pub struct PeerStorageManager<S> {
pub(crate) storage: S,
pub(crate) peer_information: Option<PeerInformation>,
pub(crate) cfg: PeerFlushingBehaviourConfig,
pub(crate) with_client_id: bool,
}
impl<S: Storage + Clone + 'static> PeerStorageManager<S> {
pub(crate) fn new(storage: S, peer: Peer, with_client_id: bool) -> Self {
let peer_information = Some(PeerInformation::new(peer));
Self {
storage,
peer_information,
cfg: PeerFlushingBehaviourConfig::default(),
with_client_id,
}
}
pub(crate) fn get_peer(&self) -> Option<WireguardPeer> {
self.peer_information
.as_ref()
.map(|p| p.peer.clone().into())
}
pub(crate) fn remove_peer(&mut self) {
self.peer_information = None;
}
pub(crate) fn update_trx(&mut self, kernel_peer: &Peer) {
if let Some(peer_information) = self.peer_information.as_mut() {
peer_information.update_trx_bytes(kernel_peer.tx_bytes, kernel_peer.rx_bytes);
}
}
pub(crate) async fn sync_storage_peer(&mut self) -> Result<(), Error> {
let Some(peer_information) = self.peer_information.as_mut() else {
return Ok(());
};
if !peer_information.should_sync(self.cfg) {
return Ok(());
}
if self
.storage
.get_wireguard_peer(&peer_information.peer().public_key.to_string())
.await?
.is_none()
{
self.peer_information = None;
return Ok(());
}
self.storage
.insert_wireguard_peer(peer_information.peer(), self.with_client_id)
.await?;
peer_information.resync_peer_with_storage();
Ok(())
}
}
#[derive(Clone, Debug)]
pub(crate) struct PeerInformation {
pub(crate) peer: Peer,
pub(crate) last_synced: OffsetDateTime,
pub(crate) bytes_delta_since_sync: u64,
}
impl PeerInformation {
pub fn new(peer: Peer) -> PeerInformation {
PeerInformation {
peer,
last_synced: OffsetDateTime::now_utc(),
bytes_delta_since_sync: 0,
}
}
pub(crate) fn should_sync(&self, cfg: PeerFlushingBehaviourConfig) -> bool {
if self.bytes_delta_since_sync >= cfg.peer_max_delta_flushing_amount {
return true;
}
if self.last_synced + cfg.peer_max_flushing_rate < OffsetDateTime::now_utc()
&& self.bytes_delta_since_sync != 0
{
return true;
}
false
}
pub(crate) fn peer(&self) -> &Peer {
&self.peer
}
pub(crate) fn update_trx_bytes(&mut self, tx_bytes: u64, rx_bytes: u64) {
self.bytes_delta_since_sync += tx_bytes.saturating_sub(self.peer.tx_bytes)
+ rx_bytes.saturating_sub(self.peer.rx_bytes);
self.peer.tx_bytes = tx_bytes;
self.peer.rx_bytes = rx_bytes;
}
pub(crate) fn resync_peer_with_storage(&mut self) {
self.bytes_delta_since_sync = 0;
self.last_synced = OffsetDateTime::now_utc();
}
}
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "explorer-api"
version = "1.1.42"
version = "1.1.43"
edition = "2021"
license.workspace = true
+1 -1
View File
@@ -166,7 +166,7 @@ impl GeoIp {
}
}
impl<'a> TryFrom<&City<'a>> for Location {
impl TryFrom<&City<'_>> for Location {
type Error = String;
fn try_from(city: &City) -> Result<Self, Self::Error> {
+1 -1
View File
@@ -65,7 +65,7 @@ impl<'r> FromRequest<'r> for Location {
}
}
impl<'a> OpenApiFromRequest<'a> for Location {
impl OpenApiFromRequest<'_> for Location {
fn from_request_input(
_gen: &mut OpenApiGenerator,
_name: String,
+46 -12
View File
@@ -5,6 +5,8 @@ use async_trait::async_trait;
use nym_sdk::{NymApiTopologyProvider, NymApiTopologyProviderConfig, UserAgent};
use nym_topology::{gateway, NymTopology, TopologyProvider};
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::sync::Mutex;
use tracing::debug;
use url::Url;
@@ -17,6 +19,7 @@ pub struct GatewayTopologyProvider {
impl GatewayTopologyProvider {
pub fn new(
gateway_node: gateway::LegacyNode,
cache_ttl: Duration,
user_agent: UserAgent,
nym_api_url: Vec<Url>,
) -> GatewayTopologyProvider {
@@ -31,6 +34,9 @@ impl GatewayTopologyProvider {
env!("CARGO_PKG_VERSION").to_string(),
Some(user_agent),
),
cache_ttl,
cached_at: OffsetDateTime::UNIX_EPOCH,
cached: None,
gateway_node,
})),
}
@@ -39,25 +45,53 @@ impl GatewayTopologyProvider {
struct GatewayTopologyProviderInner {
inner: NymApiTopologyProvider,
cache_ttl: Duration,
cached_at: OffsetDateTime,
cached: Option<NymTopology>,
gateway_node: gateway::LegacyNode,
}
impl GatewayTopologyProviderInner {
fn cached_topology(&self) -> Option<NymTopology> {
if let Some(cached_topology) = &self.cached {
if self.cached_at + self.cache_ttl > OffsetDateTime::now_utc() {
return Some(cached_topology.clone());
}
}
None
}
async fn update_cache(&mut self) -> Option<NymTopology> {
let updated_cache = match self.inner.get_new_topology().await {
None => None,
Some(mut base) => {
if !base.gateway_exists(&self.gateway_node.identity_key) {
debug!(
"{} didn't exist in topology. inserting it.",
self.gateway_node.identity_key
);
base.insert_gateway(self.gateway_node.clone());
}
Some(base)
}
};
self.cached_at = OffsetDateTime::now_utc();
self.cached = updated_cache.clone();
updated_cache
}
}
#[async_trait]
impl TopologyProvider for GatewayTopologyProvider {
async fn get_new_topology(&mut self) -> Option<NymTopology> {
let mut guard = self.inner.lock().await;
match guard.inner.get_new_topology().await {
None => None,
Some(mut base) => {
if !base.gateway_exists(&guard.gateway_node.identity_key) {
debug!(
"{} didn't exist in topology. inserting it.",
guard.gateway_node.identity_key
);
base.insert_gateway(guard.gateway_node.clone());
}
Some(base)
}
// check the cache
if let Some(cached) = guard.cached_topology() {
return Some(cached);
}
guard.update_cache().await
}
}
+26 -16
View File
@@ -30,6 +30,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::PathBuf;
use std::process;
use std::sync::Arc;
use std::time::Duration;
use tracing::*;
pub(crate) mod client_handling;
@@ -148,8 +149,11 @@ impl<St> Gateway<St> {
}
fn gateway_topology_provider(&self) -> GatewayTopologyProvider {
// TODO: make topology ttl configurable
// (to be done in reeses with the final smooshing)
GatewayTopologyProvider::new(
self.as_topology_node(),
Duration::from_secs(5 * 60),
self.user_agent.clone(),
self.config.gateway.nym_api_urls.clone(),
)
@@ -231,22 +235,28 @@ impl<St> Gateway<St> {
forwarding_channel,
router_tx,
);
let all_peers = self.client_storage.get_all_wireguard_peers().await?;
let used_private_network_ips = all_peers
.iter()
.cloned()
.map(|wireguard_peer| {
defguard_wireguard_rs::host::Peer::try_from(wireguard_peer).map(|mut peer| {
peer.allowed_ips
.pop()
.ok_or(Box::new(GatewayError::InternalWireguardError(format!(
"no private IP set for peer {}",
peer.public_key
))))
.map(|p| p.ip)
})
})
.collect::<Result<Result<Vec<_>, _>, _>>()??;
let mut used_private_network_ips = vec![];
let mut all_peers = vec![];
for wireguard_peer in self
.client_storage
.get_all_wireguard_peers()
.await?
.into_iter()
{
let mut peer = defguard_wireguard_rs::host::Peer::try_from(wireguard_peer.clone())?;
let Some(peer) = peer.allowed_ips.pop() else {
tracing::warn!(
"Peer {} has empty allowed ips. It will be removed",
peer.public_key
);
self.client_storage
.remove_wireguard_peer(&peer.public_key.to_string())
.await?;
continue;
};
used_private_network_ips.push(peer.ip);
all_peers.push(wireguard_peer);
}
if let Some(wireguard_data) = self.wireguard_data.take() {
let (on_start_tx, on_start_rx) = oneshot::channel();
+3 -2
View File
@@ -4,7 +4,7 @@
[package]
name = "nym-api"
license = "GPL-3.0"
version = "1.1.46"
version = "1.1.47"
authors.workspace = true
edition = "2021"
rust-version.workspace = true
@@ -20,6 +20,7 @@ bloomfilter = { workspace = true }
cfg-if = { workspace = true }
clap = { workspace = true, features = ["cargo", "derive", "env"] }
console-subscriber = { workspace = true, optional = true } # validator-api needs to be built with RUSTFLAGS="--cfg tokio_unstable"
dashmap = { workspace = true }
dirs = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
@@ -120,7 +121,7 @@ nym-types = { path = "../common/types" }
nym-http-api-common = { path = "../common/http-api-common", features = ["utoipa"] }
nym-serde-helpers = { path = "../common/serde-helpers", features = ["date"] }
nym-ticketbooks-merkle = { path = "../common/ticketbooks-merkle" }
nym-statistics-common = {path ="../common/statistics" }
nym-statistics-common = { path = "../common/statistics" }
[features]
no-reward = []
@@ -0,0 +1,8 @@
/*
* Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
* SPDX-License-Identifier: Apache-2.0
*/
CREATE INDEX IF NOT EXISTS monitor_run_id on monitor_run(id);
CREATE INDEX IF NOT EXISTS monitor_run_timestamp on monitor_run(timestamp);
CREATE INDEX IF NOT EXISTS testing_route_monitor_run_id on testing_route(monitor_run_id);
+1 -1
View File
@@ -64,7 +64,7 @@ pub(crate) mod overengineered_offset_date_time_serde {
])),
];
impl<'de> Visitor<'de> for OffsetDateTimeVisitor {
impl Visitor<'_> for OffsetDateTimeVisitor {
type Value = OffsetDateTime;
fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
+11 -21
View File
@@ -6,7 +6,7 @@ use crate::ecash::error::EcashError;
use crate::ecash::state::EcashState;
use crate::node_status_api::models::AxumResult;
use crate::support::http::state::AppState;
use axum::extract::Path;
use axum::extract::{Query, State};
use axum::{Json, Router};
use nym_api_requests::ecash::models::{
AggregatedCoinIndicesSignatureResponse, AggregatedExpirationDateSignatureResponse,
@@ -21,28 +21,19 @@ use tracing::trace;
use utoipa::IntoParams;
/// routes with globally aggregated keys, signatures, etc.
pub(crate) fn aggregation_routes(ecash_state: Arc<EcashState>) -> Router<AppState> {
pub(crate) fn aggregation_routes() -> Router<AppState> {
Router::new()
.route(
"/master-verification-key",
axum::routing::get({
let ecash_state = Arc::clone(&ecash_state);
|epoch_id| master_verification_key(epoch_id, ecash_state)
}),
axum::routing::get(master_verification_key),
)
.route(
"/aggregated-expiration-date-signatures",
axum::routing::get({
let ecash_state = Arc::clone(&ecash_state);
|expiration_date| expiration_date_signatures(expiration_date, ecash_state)
}),
axum::routing::get(expiration_date_signatures),
)
.route(
"/aggregated-coin-indices-signatures",
axum::routing::get({
let ecash_state = Arc::clone(&ecash_state);
|epoch_id| coin_indices_signatures(epoch_id, ecash_state)
}),
axum::routing::get(coin_indices_signatures),
)
}
@@ -58,8 +49,8 @@ pub(crate) fn aggregation_routes(ecash_state: Arc<EcashState>) -> Router<AppStat
)
)]
async fn master_verification_key(
Path(EpochIdParam { epoch_id }): Path<EpochIdParam>,
state: Arc<EcashState>,
State(state): State<Arc<EcashState>>,
Query(EpochIdParam { epoch_id }): Query<EpochIdParam>,
) -> AxumResult<Json<VerificationKeyResponse>> {
trace!("aggregated_verification_key request");
@@ -72,7 +63,6 @@ async fn master_verification_key(
}
#[derive(Deserialize, IntoParams)]
#[into_params(parameter_in = Path)]
struct ExpirationDateParam {
expiration_date: Option<String>,
}
@@ -89,8 +79,8 @@ struct ExpirationDateParam {
)
)]
async fn expiration_date_signatures(
Path(ExpirationDateParam { expiration_date }): Path<ExpirationDateParam>,
state: Arc<EcashState>,
State(state): State<Arc<EcashState>>,
Query(ExpirationDateParam { expiration_date }): Query<ExpirationDateParam>,
) -> AxumResult<Json<AggregatedExpirationDateSignatureResponse>> {
trace!("aggregated_expiration_date_signatures request");
@@ -126,8 +116,8 @@ async fn expiration_date_signatures(
)
)]
async fn coin_indices_signatures(
Path(EpochIdParam { epoch_id }): Path<EpochIdParam>,
state: Arc<EcashState>,
Query(EpochIdParam { epoch_id }): Query<EpochIdParam>,
State(state): State<Arc<EcashState>>,
) -> AxumResult<Json<AggregatedCoinIndicesSignatureResponse>> {
trace!("aggregated_coin_indices_signatures request");
+5 -7
View File
@@ -5,15 +5,13 @@ use crate::ecash::api_routes::aggregation::aggregation_routes;
use crate::ecash::api_routes::issued::issued_routes;
use crate::ecash::api_routes::partial_signing::partial_signing_routes;
use crate::ecash::api_routes::spending::spending_routes;
use crate::ecash::state::EcashState;
use crate::support::http::state::AppState;
use axum::Router;
use std::sync::Arc;
pub(crate) fn ecash_routes(ecash_state: Arc<EcashState>) -> Router<AppState> {
pub(crate) fn ecash_routes() -> Router<AppState> {
Router::new()
.merge(aggregation_routes(Arc::clone(&ecash_state)))
.merge(issued_routes(Arc::clone(&ecash_state)))
.merge(partial_signing_routes(Arc::clone(&ecash_state)))
.merge(spending_routes(Arc::clone(&ecash_state)))
.merge(aggregation_routes())
.merge(issued_routes())
.merge(partial_signing_routes())
.merge(spending_routes())
}
-1
View File
@@ -2,7 +2,6 @@
// SPDX-License-Identifier: GPL-3.0-only
#[derive(serde::Deserialize, utoipa::IntoParams)]
#[into_params(parameter_in = Path)]
pub(super) struct EpochIdParam {
pub(super) epoch_id: Option<u64>,
}
+6 -12
View File
@@ -4,7 +4,7 @@
use crate::ecash::state::EcashState;
use crate::node_status_api::models::AxumResult;
use crate::support::http::state::AppState;
use axum::extract::Path;
use axum::extract::{Path, State};
use axum::{Json, Router};
use nym_api_requests::ecash::models::{
IssuedTicketbooksChallengeRequest, IssuedTicketbooksChallengeResponse,
@@ -17,21 +17,15 @@ use time::Date;
use tracing::trace;
use utoipa::{IntoParams, ToSchema};
pub(crate) fn issued_routes(ecash_state: Arc<EcashState>) -> Router<AppState> {
pub(crate) fn issued_routes() -> Router<AppState> {
Router::new()
.route(
"/issued-ticketbooks-for/:expiration_date",
axum::routing::get({
let ecash_state = Arc::clone(&ecash_state);
|expiration_date| issued_ticketbooks_for(expiration_date, ecash_state)
}),
axum::routing::get(issued_ticketbooks_for),
)
.route(
"/issued-ticketbooks-challenge",
axum::routing::post({
let ecash_state = Arc::clone(&ecash_state);
|body| issued_ticketbooks_challenge(body, ecash_state)
}),
axum::routing::post(issued_ticketbooks_challenge),
)
}
@@ -58,8 +52,8 @@ pub(crate) struct ExpirationDatePathParam {
)
)]
async fn issued_ticketbooks_for(
State(state): State<Arc<EcashState>>,
Path(ExpirationDatePathParam { expiration_date }): Path<ExpirationDatePathParam>,
state: Arc<EcashState>,
) -> AxumResult<Json<IssuedTicketbooksForResponse>> {
state.ensure_signer().await?;
@@ -83,8 +77,8 @@ async fn issued_ticketbooks_for(
)
)]
async fn issued_ticketbooks_challenge(
State(state): State<Arc<EcashState>>,
Json(challenge): Json<IssuedTicketbooksChallengeRequest>,
state: Arc<EcashState>,
) -> AxumResult<Json<IssuedTicketbooksChallengeResponse>> {
trace!("replying to ticketbooks challenge: {:?}", challenge);
state.ensure_signer().await?;
@@ -7,7 +7,7 @@ use crate::ecash::helpers::blind_sign;
use crate::ecash::state::EcashState;
use crate::node_status_api::models::AxumResult;
use crate::support::http::state::AppState;
use axum::extract::Query;
use axum::extract::{Query, State};
use axum::{Json, Router};
use nym_api_requests::ecash::{
BlindSignRequestBody, BlindedSignatureResponse, PartialCoinIndicesSignatureResponse,
@@ -22,28 +22,16 @@ use time::Date;
use tracing::{debug, trace};
use utoipa::IntoParams;
pub(crate) fn partial_signing_routes(ecash_state: Arc<EcashState>) -> Router<AppState> {
pub(crate) fn partial_signing_routes() -> Router<AppState> {
Router::new()
.route(
"/blind-sign",
axum::routing::post({
let ecash_state = Arc::clone(&ecash_state);
|body| post_blind_sign(body, ecash_state)
}),
)
.route("/blind-sign", axum::routing::post(post_blind_sign))
.route(
"/partial-expiration-date-signatures",
axum::routing::get({
let ecash_state = Arc::clone(&ecash_state);
|expiration_date| partial_expiration_date_signatures(expiration_date, ecash_state)
}),
axum::routing::get(partial_expiration_date_signatures),
)
.route(
"/partial-coin-indices-signatures",
axum::routing::get({
let ecash_state = Arc::clone(&ecash_state);
|epoch_id| partial_coin_indices_signatures(epoch_id, ecash_state)
}),
axum::routing::get(partial_coin_indices_signatures),
)
}
@@ -59,8 +47,8 @@ pub(crate) fn partial_signing_routes(ecash_state: Arc<EcashState>) -> Router<App
)
)]
async fn post_blind_sign(
State(state): State<Arc<EcashState>>,
Json(blind_sign_request_body): Json<BlindSignRequestBody>,
state: Arc<EcashState>,
) -> AxumResult<Json<BlindedSignatureResponse>> {
state.ensure_signer().await?;
@@ -134,8 +122,8 @@ struct ExpirationDateParam {
)
)]
async fn partial_expiration_date_signatures(
State(state): State<Arc<EcashState>>,
Query(ExpirationDateParam { expiration_date }): Query<ExpirationDateParam>,
state: Arc<EcashState>,
) -> AxumResult<Json<PartialExpirationDateSignatureResponse>> {
state.ensure_signer().await?;
@@ -172,8 +160,8 @@ async fn partial_expiration_date_signatures(
)
)]
async fn partial_coin_indices_signatures(
State(state): State<Arc<EcashState>>,
Query(EpochIdParam { epoch_id }): Query<EpochIdParam>,
state: Arc<EcashState>,
) -> AxumResult<Json<PartialCoinIndicesSignatureResponse>> {
state.ensure_signer().await?;
+8 -21
View File
@@ -5,6 +5,7 @@ use crate::ecash::error::EcashError;
use crate::ecash::state::EcashState;
use crate::node_status_api::models::{AxumErrorResponse, AxumResult};
use crate::support::http::state::AppState;
use axum::extract::State;
use axum::{Json, Router};
use nym_api_requests::constants::MIN_BATCH_REDEMPTION_DELAY;
use nym_api_requests::ecash::models::{
@@ -21,28 +22,16 @@ use time::{OffsetDateTime, Time};
use tracing::{error, warn};
#[allow(deprecated)]
pub(crate) fn spending_routes(ecash_state: Arc<EcashState>) -> Router<AppState> {
pub(crate) fn spending_routes() -> Router<AppState> {
Router::new()
.route(
"/verify-ecash-ticket",
axum::routing::post({
let ecash_state = Arc::clone(&ecash_state);
|body| verify_ticket(body, ecash_state)
}),
)
.route("/verify-ecash-ticket", axum::routing::post(verify_ticket))
.route(
"/batch-redeem-ecash-tickets",
axum::routing::post({
let ecash_state = Arc::clone(&ecash_state);
|body| batch_redeem_tickets(body, ecash_state)
}),
axum::routing::post(batch_redeem_tickets),
)
.route(
"/double-spending-filter-v1",
axum::routing::get({
let ecash_state = Arc::clone(&ecash_state);
|| double_spending_filter_v1(ecash_state)
}),
axum::routing::get(double_spending_filter_v1),
)
}
@@ -67,9 +56,9 @@ fn reject_ticket(
)
)]
async fn verify_ticket(
State(state): State<Arc<EcashState>>,
// TODO in the future: make it send binary data rather than json
Json(verify_ticket_body): Json<VerifyEcashTicketBody>,
state: Arc<EcashState>,
) -> AxumResult<Json<EcashTicketVerificationResponse>> {
state.ensure_signer().await?;
@@ -170,9 +159,9 @@ async fn verify_ticket(
)
)]
async fn batch_redeem_tickets(
State(state): State<Arc<EcashState>>,
// TODO in the future: make it send binary data rather than json
Json(batch_redeem_credentials_body): Json<BatchRedeemTicketsBody>,
state: Arc<EcashState>,
) -> AxumResult<Json<EcashBatchTicketRedemptionResponse>> {
state.ensure_signer().await?;
@@ -244,8 +233,6 @@ async fn batch_redeem_tickets(
)
)]
#[deprecated]
async fn double_spending_filter_v1(
_state: Arc<EcashState>,
) -> AxumResult<Json<SpentCredentialsResponse>> {
async fn double_spending_filter_v1() -> AxumResult<Json<SpentCredentialsResponse>> {
AxumResult::Err(AxumErrorResponse::internal_msg("permanently restricted"))
}
+1 -1
View File
@@ -27,7 +27,7 @@ use nym_validator_client::EcashApiClient;
#[async_trait]
pub trait Client {
async fn address(&self) -> AccountId;
async fn address(&self) -> Result<AccountId>;
async fn dkg_contract_address(&self) -> Result<AccountId>;
+4 -4
View File
@@ -35,7 +35,7 @@ impl DkgClient {
}
}
pub(crate) async fn get_address(&self) -> AccountId {
pub(crate) async fn get_address(&self) -> Result<AccountId, EcashError> {
self.inner.address().await
}
@@ -53,7 +53,7 @@ impl DkgClient {
pub(crate) async fn group_member(&self) -> Result<MemberResponse, EcashError> {
self.inner
.group_member(self.get_address().await.to_string())
.group_member(self.get_address().await?.to_string())
.await
}
@@ -126,7 +126,7 @@ impl DkgClient {
&self,
epoch_id: EpochId,
) -> Result<Option<ContractVKShare>, EcashError> {
let address = self.inner.address().await;
let address = self.inner.address().await?;
self.get_verification_key_share(epoch_id, address).await
}
@@ -138,7 +138,7 @@ impl DkgClient {
}
pub(crate) async fn get_vote(&self, proposal_id: u64) -> Result<VoteResponse, EcashError> {
let address = self.get_address().await.to_string();
let address = self.get_address().await?.to_string();
self.inner.get_vote(proposal_id, address).await
}
+2 -2
View File
@@ -155,7 +155,7 @@ impl<R: RngCore + CryptoRng> DkgController<R> {
resharing: bool,
) -> Result<(), DealingGenerationError> {
let dealing_state = self.state.dealing_exchange_state(epoch_id)?;
let address = self.dkg_client.get_address().await.to_string();
let address = self.dkg_client.get_address().await?.to_string();
let status = self
.dkg_client
@@ -259,7 +259,7 @@ impl<R: RngCore + CryptoRng> DkgController<R> {
.checked_sub(1)
.expect("resharing epoch invariant has been broken");
let address = self.dkg_client.get_address().await;
let address = self.dkg_client.get_address().await?;
Ok(self
.dkg_client
.dealer_in_epoch(previous_epoch_id, address.to_string())
+1 -1
View File
@@ -494,7 +494,7 @@ impl<R: RngCore + CryptoRng> DkgController<R> {
// submitted proposals and find the one with our address
self.get_validation_proposals()
.await?
.get(self.dkg_client.get_address().await.as_ref())
.get(self.dkg_client.get_address().await?.as_ref())
.copied()
.ok_or(KeyDerivationError::UnrecoverableProposalId)
}
+4 -4
View File
@@ -155,7 +155,7 @@ impl<R: RngCore + CryptoRng> DkgController<R> {
};
// if this is our share, obviously vote for yes without spending time on verification
if owner.as_ref() == self.dkg_client.get_address().await.as_ref() {
if owner.as_ref() == self.dkg_client.get_address().await?.as_ref() {
votes.insert(*proposal_id, true);
continue;
}
@@ -313,7 +313,7 @@ mod tests {
exchange_dealings(&mut controllers, false).await;
derive_keypairs(&mut controllers, false).await;
let first_dealer = controllers[0].dkg_client.get_address().await;
let first_dealer = controllers[0].dkg_client.get_address().await?;
{
let mut guard = chain.lock().unwrap();
@@ -365,8 +365,8 @@ mod tests {
exchange_dealings(&mut controllers, false).await;
derive_keypairs(&mut controllers, false).await;
let first_dealer = controllers[0].dkg_client.get_address().await;
let second_dealer = controllers[1].dkg_client.get_address().await;
let first_dealer = controllers[0].dkg_client.get_address().await?;
let second_dealer = controllers[1].dkg_client.get_address().await?;
{
let mut guard = chain.lock().unwrap();
+3
View File
@@ -25,6 +25,9 @@ pub enum EcashError {
#[error(transparent)]
IOError(#[from] std::io::Error),
#[error("this instance is running without on-chain signing capabilities so no transactions can be sent")]
ChainSignerNotEnabled,
#[error("this operation couldn't be completed as this nym-api is not an active ecash signer")]
NotASigner,
+7 -5
View File
@@ -139,7 +139,9 @@ impl EcashState {
.local
.active_signer
.get_or_init(epoch_id, || async {
let address = self.aux.client.address().await;
let Ok(address) = self.aux.client.address().await else {
return Ok(false);
};
let ecash_signers = self.aux.comm_channel.ecash_clients(epoch_id).await?;
// check if any ecash signers for this epoch has the same cosmos address as this api
@@ -246,7 +248,7 @@ impl EcashState {
let threshold = self.aux.comm_channel.ecash_threshold(epoch_id).await?;
// let mut shares = Mutex::new(Vec::with_capacity(all_apis.len()));
let cosmos_address = self.aux.client.address().await;
let cosmos_address = self.aux.client.address().await.ok();
let get_partial_signatures = |api: EcashApiClient| async {
// move the api into the closure
@@ -256,7 +258,7 @@ impl EcashState {
// check if we're attempting to query ourselves, in that case just get local signature
// rather than making the http query
let partial = if api.cosmos_address == cosmos_address {
let partial = if Some(api.cosmos_address) == cosmos_address {
self.partial_coin_index_signatures(Some(epoch_id))
.await?
.signatures
@@ -380,7 +382,7 @@ impl EcashState {
let all_apis = self.aux.comm_channel.ecash_clients(epoch_id).await?;
let threshold = self.aux.comm_channel.ecash_threshold(epoch_id).await?;
let cosmos_address = self.aux.client.address().await;
let cosmos_address = self.aux.client.address().await.ok();
let get_partial_signatures = |api: EcashApiClient| async {
// move the api into the closure
@@ -390,7 +392,7 @@ impl EcashState {
// check if we're attempting to query ourselves, in that case just get local signature
// rather than making the http query
let partial = if api.cosmos_address == cosmos_address {
let partial = if Some(api.cosmos_address) == cosmos_address {
self.partial_expiration_date_signatures(expiration_date)
.await?
.signatures
+1 -1
View File
@@ -263,7 +263,7 @@ pub(crate) struct TestingDkgController {
impl TestingDkgController {
pub async fn address(&self) -> AccountId {
self.dkg_client.get_address().await
self.dkg_client.get_address().await.unwrap()
}
pub async fn cw_address(&self) -> Addr {
+2 -2
View File
@@ -51,7 +51,7 @@ pub(crate) async fn initialise_dkg(controllers: &mut [TestingDkgController], res
// add every dealer to group contract
for controller in controllers.iter() {
let address = controller.dkg_client.get_address().await;
let address = controller.dkg_client.get_address().await.unwrap();
let mut chain = controllers[0].chain_state.lock().unwrap();
chain.add_member(address.as_ref(), 10);
}
@@ -76,7 +76,7 @@ pub(crate) async fn submit_public_keys(controllers: &mut [TestingDkgController],
.unwrap();
}
let threshold = (2 * controllers.len() as u64 + 3 - 1) / 3;
let threshold = (2 * controllers.len() as u64).div_ceil(3);
let mut guard = controllers[0].chain_state.lock().unwrap();
guard.dkg_contract.epoch.state = EpochState::DealingExchange { resharing };
+10 -7
View File
@@ -11,9 +11,10 @@ use crate::node_describe_cache::DescribedNodes;
use crate::node_status_api::handlers::unstable;
use crate::node_status_api::NodeStatusCache;
use crate::nym_contract_cache::cache::NymContractCache;
use crate::status::ApiStatusState;
use crate::support::caching::cache::SharedCache;
use crate::support::config;
use crate::support::http::state::AppState;
use crate::support::http::state::{AppState, ForcedRefresh};
use crate::support::storage::NymApiStorage;
use async_trait::async_trait;
use axum::Router;
@@ -524,8 +525,8 @@ impl DummyClient {
#[async_trait]
impl super::client::Client for DummyClient {
async fn address(&self) -> AccountId {
self.validator_address.clone()
async fn address(&self) -> Result<AccountId> {
Ok(self.validator_address.clone())
}
async fn dkg_contract_address(&self) -> Result<AccountId> {
@@ -1262,9 +1263,9 @@ struct TestFixture {
}
impl TestFixture {
fn build_app_state(storage: NymApiStorage) -> AppState {
fn build_app_state(storage: NymApiStorage, ecash_state: EcashState) -> AppState {
AppState {
forced_refresh: Default::default(),
forced_refresh: ForcedRefresh::new(true),
nym_contract_cache: NymContractCache::new(),
node_status_cache: NodeStatusCache::new(),
circulating_supply_cache: CirculatingSupplyCache::new("unym".to_owned()),
@@ -1275,6 +1276,8 @@ impl TestFixture {
NymNetworkDetails::new_empty(),
),
node_info_cache: unstable::NodeInfoCache::default(),
api_status: ApiStatusState::new(None),
ecash_state: Arc::new(ecash_state),
}
}
@@ -1337,8 +1340,8 @@ impl TestFixture {
TestFixture {
axum: TestServer::new(
Router::new()
.nest("/v1/ecash", ecash_routes(Arc::new(ecash_state)))
.with_state(Self::build_app_state(storage.clone())),
.nest("/v1/ecash", ecash_routes())
.with_state(Self::build_app_state(storage.clone(), ecash_state)),
)
.unwrap(),
storage,
+3
View File
@@ -11,6 +11,9 @@ use thiserror::Error;
#[derive(Debug, Error)]
pub enum RewardingError {
#[error("this instance is running without on-chain signing capabilities so no transactions can be sent")]
ChainSignerNotEnabled,
#[error("Our account ({our_address}) is not permitted to update rewarded set and perform rewarding. The allowed address is {allowed_address}")]
Unauthorised {
our_address: AccountId,
+9 -4
View File
@@ -134,9 +134,12 @@ impl EpochAdvancer {
let epoch_status = self.nyxd_client.get_current_epoch_status().await?;
if !epoch_status.is_in_progress() {
if epoch_status.being_advanced_by.as_str()
!= self.nyxd_client.client_address().await.as_ref()
{
// SAFETY: before `EpochAdvancer` is started, `ensure_rewarding_permission` is called
// which is not allowed to progress if this instance is not using a signing client
#[allow(clippy::unwrap_used)]
let address = self.nyxd_client.client_address().await.unwrap();
if epoch_status.being_advanced_by.as_str() != address.as_ref() {
// another nym-api is already handling
error!("another nym-api ({}) is already advancing the epoch... but we shouldn't have other nym-apis yet!", epoch_status.being_advanced_by);
return Ok(());
@@ -318,8 +321,10 @@ impl EpochAdvancer {
pub(crate) async fn ensure_rewarding_permission(
nyxd_client: &Client,
) -> Result<(), RewardingError> {
let Some(our_address) = nyxd_client.client_address().await else {
return Err(RewardingError::ChainSignerNotEnabled);
};
let allowed_address = nyxd_client.get_rewarding_validator_address().await?;
let our_address = nyxd_client.client_address().await;
if allowed_address != our_address {
Err(RewardingError::Unauthorised {
our_address,
@@ -59,7 +59,7 @@ impl GatewayClientHandle {
}
}
impl<'a> UnlockedGatewayClientHandle<'a> {
impl UnlockedGatewayClientHandle<'_> {
pub(crate) fn get_mut_unchecked(
&mut self,
) -> &mut GatewayClient<nyxd::Client, PersistentStorage> {
+1 -5
View File
@@ -14,7 +14,6 @@ use nym_sphinx::params::PacketType;
use nym_sphinx::receiver::MessageReceiver;
use nym_task::TaskClient;
use std::collections::{HashMap, HashSet};
use std::process;
use tokio::time::{sleep, Duration, Instant};
use tracing::{debug, error, info, trace};
@@ -95,10 +94,7 @@ impl<R: MessageReceiver + Send> Monitor<R> {
)
.await
{
error!("Failed to submit monitor run information to the database - {err}",);
// TODO: slightly more graceful shutdown here
process::exit(1);
error!("Failed to submit monitor run information to the database: {err}",);
}
}
@@ -121,6 +121,7 @@ impl PacketPreparer {
test_route.topology().clone(),
self_address,
PacketSize::RegularPacket,
false,
DEFAULT_AVERAGE_PACKET_DELAY,
DEFAULT_AVERAGE_ACK_DELAY,
self.ack_key.clone(),
+37 -12
View File
@@ -57,6 +57,9 @@ pub enum NodeDescribeCacheError {
// TODO: perhaps include more details here like whether key/signature/payload was malformed
#[error("could not verify signed host information for node {node_id}")]
MissignedHostInformation { node_id: NodeId },
#[error("node {node_id} is announcing an illegal ip address")]
IllegalIpAddress { node_id: NodeId },
}
// this exists because I've been moving things around quite a lot and now the place that holds the type
@@ -199,13 +202,18 @@ impl DescribedNodes {
pub struct NodeDescriptionProvider {
contract_cache: NymContractCache,
allow_all_ips: bool,
batch_size: usize,
}
impl NodeDescriptionProvider {
pub(crate) fn new(contract_cache: NymContractCache) -> NodeDescriptionProvider {
pub(crate) fn new(
contract_cache: NymContractCache,
allow_all_ips: bool,
) -> NodeDescriptionProvider {
NodeDescriptionProvider {
contract_cache,
allow_all_ips,
batch_size: DEFAULT_NODE_DESCRIBE_BATCH_SIZE,
}
}
@@ -270,6 +278,7 @@ async fn try_get_client(
async fn try_get_description(
data: RefreshData,
allow_all_ips: bool,
) -> Result<NymNodeDescription, NodeDescribeCacheError> {
let client = try_get_client(&data.host, data.node_id, data.port).await?;
@@ -286,6 +295,12 @@ async fn try_get_description(
});
}
if !allow_all_ips && !host_info.data.check_ips() {
return Err(NodeDescribeCacheError::IllegalIpAddress {
node_id: data.node_id,
});
}
let node_info = query_for_described_data(&client, data.node_id).await?;
let description = node_info.into_node_description(host_info.data);
@@ -357,8 +372,8 @@ impl RefreshData {
self.node_id
}
pub(crate) async fn try_refresh(self) -> Option<NymNodeDescription> {
match try_get_description(self).await {
pub(crate) async fn try_refresh(self, allow_all_ips: bool) -> Option<NymNodeDescription> {
match try_get_description(self, allow_all_ips).await {
Ok(description) => Some(description),
Err(err) => {
debug!("failed to obtain node self-described data: {err}");
@@ -412,11 +427,15 @@ impl CacheItemProvider for NodeDescriptionProvider {
}
}
let nodes = stream::iter(nodes_to_query.into_iter().map(|n| n.try_refresh()))
.buffer_unordered(self.batch_size)
.filter_map(|x| async move { x.map(|d| (d.node_id, d)) })
.collect::<HashMap<_, _>>()
.await;
let nodes = stream::iter(
nodes_to_query
.into_iter()
.map(|n| n.try_refresh(self.allow_all_ips)),
)
.buffer_unordered(self.batch_size)
.filter_map(|x| async move { x.map(|d| (d.node_id, d)) })
.collect::<HashMap<_, _>>()
.await;
info!("refreshed self described data for {} nodes", nodes.len());
@@ -432,8 +451,11 @@ pub(crate) fn new_refresher(
) -> CacheRefresher<DescribedNodes, NodeDescribeCacheError> {
CacheRefresher::new(
Box::new(
NodeDescriptionProvider::new(contract_cache)
.with_batch_size(config.debug.node_describe_batch_size),
NodeDescriptionProvider::new(
contract_cache,
config.debug.node_describe_allow_illegal_ips,
)
.with_batch_size(config.debug.node_describe_batch_size),
),
config.debug.node_describe_caching_interval,
)
@@ -446,8 +468,11 @@ pub(crate) fn new_refresher_with_initial_value(
) -> CacheRefresher<DescribedNodes, NodeDescribeCacheError> {
CacheRefresher::new_with_initial_value(
Box::new(
NodeDescriptionProvider::new(contract_cache)
.with_batch_size(config.debug.node_describe_batch_size),
NodeDescriptionProvider::new(
contract_cache,
config.debug.node_describe_allow_illegal_ips,
)
.with_batch_size(config.debug.node_describe_batch_size),
),
config.debug.node_describe_caching_interval,
initial,
@@ -88,7 +88,6 @@ pub(crate) async fn submit_gateway_monitoring_results(
match state
.storage
.manager
.submit_gateway_statuses_v2(message.results())
.await
{
@@ -133,7 +132,6 @@ pub(crate) async fn submit_node_monitoring_results(
match state
.storage
.manager
.submit_mixnode_statuses_v2(message.results())
.await
{
+2 -1
View File
@@ -86,8 +86,9 @@ async fn refresh_described(
}
// to make sure you can't ddos the endpoint while a request is in progress
state.forced_refresh.set_last_refreshed(node_id).await;
let allow_all_ips = state.forced_refresh.allow_all_ip_addresses;
if let Some(updated_data) = refresh_data.try_refresh().await {
if let Some(updated_data) = refresh_data.try_refresh(allow_all_ips).await {
let Ok(mut describe_cache) = state.described_nodes_cache.write().await else {
return Err(AxumErrorResponse::service_unavailable());
};
+9 -24
View File
@@ -4,37 +4,20 @@
use crate::node_status_api::models::{AxumErrorResponse, AxumResult};
use crate::status::ApiStatusState;
use crate::support::http::state::AppState;
use axum::extract::State;
use axum::Json;
use axum::Router;
use nym_api_requests::models::{ApiHealthResponse, SignerInformationResponse};
use nym_bin_common::build_information::BinaryBuildInformationOwned;
use nym_compact_ecash::Base58;
use std::sync::Arc;
pub(crate) fn api_status_routes() -> Router<AppState> {
let api_status_state = Arc::new(ApiStatusState::new());
Router::new()
.route(
"/health",
axum::routing::get({
let state = Arc::clone(&api_status_state);
|| health(state)
}),
)
.route(
"/build-information",
axum::routing::get({
let state = Arc::clone(&api_status_state);
|| build_information(state)
}),
)
.route("/health", axum::routing::get(health))
.route("/build-information", axum::routing::get(build_information))
.route(
"/signer-information",
axum::routing::get({
let state = Arc::clone(&api_status_state);
|| signer_information(state)
}),
axum::routing::get(signer_information),
)
}
@@ -46,7 +29,7 @@ pub(crate) fn api_status_routes() -> Router<AppState> {
(status = 200, body = ApiHealthResponse)
)
)]
async fn health(state: Arc<ApiStatusState>) -> Json<ApiHealthResponse> {
async fn health(State(state): State<ApiStatusState>) -> Json<ApiHealthResponse> {
let uptime = state.startup_time.elapsed();
let health = ApiHealthResponse::new_healthy(uptime);
Json(health)
@@ -60,7 +43,9 @@ async fn health(state: Arc<ApiStatusState>) -> Json<ApiHealthResponse> {
(status = 200, body = BinaryBuildInformationOwned)
)
)]
async fn build_information(state: Arc<ApiStatusState>) -> Json<BinaryBuildInformationOwned> {
async fn build_information(
State(state): State<ApiStatusState>,
) -> Json<BinaryBuildInformationOwned> {
Json(state.build_information.to_owned())
}
@@ -73,7 +58,7 @@ async fn build_information(state: Arc<ApiStatusState>) -> Json<BinaryBuildInform
)
)]
async fn signer_information(
state: Arc<ApiStatusState>,
State(state): State<ApiStatusState>,
) -> AxumResult<Json<SignerInformationResponse>> {
let signer_state = state.signer_information.as_ref().ok_or_else(|| {
AxumErrorResponse::internal_msg("this api does not expose zk-nym signing functionalities")
+20 -9
View File
@@ -4,12 +4,25 @@
use crate::ecash;
use nym_bin_common::bin_info;
use nym_bin_common::build_information::BinaryBuildInformation;
use std::ops::Deref;
use std::sync::Arc;
use tokio::time::Instant;
pub(crate) mod handlers;
#[derive(Clone)]
pub(crate) struct ApiStatusState {
inner: Arc<ApiStatusStateInner>,
}
impl Deref for ApiStatusState {
type Target = ApiStatusStateInner;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
pub(crate) struct ApiStatusStateInner {
startup_time: Instant,
build_information: BinaryBuildInformation,
signer_information: Option<SignerState>,
@@ -27,15 +40,13 @@ pub(crate) struct SignerState {
}
impl ApiStatusState {
pub fn new() -> Self {
pub fn new(signer_information: Option<SignerState>) -> Self {
ApiStatusState {
startup_time: Instant::now(),
build_information: bin_info!(),
signer_information: None,
inner: Arc::new(ApiStatusStateInner {
startup_time: Instant::now(),
build_information: bin_info!(),
signer_information,
}),
}
}
pub fn add_zk_nym_signer(&mut self, signer_information: SignerState) {
self.signer_information = Some(signer_information)
}
}
+13 -12
View File
@@ -2,7 +2,6 @@
// SPDX-License-Identifier: GPL-3.0-only
use crate::circulating_supply_api::cache::CirculatingSupplyCache;
use crate::ecash::api_routes::handlers::ecash_routes;
use crate::ecash::client::Client;
use crate::ecash::comm::QueryCommunicationChannel;
use crate::ecash::dkg::controller::keys::{
@@ -21,7 +20,9 @@ use crate::status::{ApiStatusState, SignerState};
use crate::support::caching::cache::SharedCache;
use crate::support::config::helpers::try_load_current_config;
use crate::support::config::Config;
use crate::support::http::state::{AppState, ShutdownHandles, TASK_MANAGER_TIMEOUT_S};
use crate::support::http::state::{
AppState, ForcedRefresh, ShutdownHandles, TASK_MANAGER_TIMEOUT_S,
};
use crate::support::http::RouterBuilder;
use crate::support::nyxd;
use crate::support::storage::runtime_migrations::m001_directory_services_v2_1::migrate_to_directory_services_v2_1;
@@ -136,8 +137,6 @@ async fn start_nym_api_tasks_axum(config: &Config) -> anyhow::Result<ShutdownHan
let described_nodes_cache = SharedCache::<DescribedNodes>::new();
let node_info_cache = unstable::NodeInfoCache::default();
let mut status_state = ApiStatusState::new();
let ecash_contract = nyxd_client
.get_ecash_contract_address()
.await
@@ -159,8 +158,8 @@ async fn start_nym_api_tasks_axum(config: &Config) -> anyhow::Result<ShutdownHan
// if ecash signer is enabled, there are additional constraints on the nym-api,
// such as having sufficient token balance
let router = if config.ecash_signer.enabled {
let cosmos_address = nyxd_client.address().await;
let signer_information = if config.ecash_signer.enabled {
let cosmos_address = nyxd_client.address().await?;
// make sure we have some tokens to cover multisig fees
let balance = nyxd_client.balance(&mix_denom).await?;
@@ -175,20 +174,20 @@ async fn start_nym_api_tasks_axum(config: &Config) -> anyhow::Result<ShutdownHan
.clone()
.map(|u| u.to_string())
.unwrap_or_default();
status_state.add_zk_nym_signer(SignerState {
Some(SignerState {
cosmos_address: cosmos_address.to_string(),
identity: encoded_identity,
announce_address,
ecash_keypair: ecash_keypair_wrapper.clone(),
});
router.nest("/v1/ecash", ecash_routes(Arc::new(ecash_state)))
})
} else {
router
None
};
let router = router.with_state(AppState {
forced_refresh: Default::default(),
forced_refresh: ForcedRefresh::new(
config.topology_cacher.debug.node_describe_allow_illegal_ips,
),
nym_contract_cache: nym_contract_cache_state.clone(),
node_status_cache: node_status_cache_state.clone(),
circulating_supply_cache: circulating_supply_cache.clone(),
@@ -196,6 +195,8 @@ async fn start_nym_api_tasks_axum(config: &Config) -> anyhow::Result<ShutdownHan
described_nodes_cache: described_nodes_cache.clone(),
network_details,
node_info_cache,
api_status: ApiStatusState::new(signer_information),
ecash_state: Arc::new(ecash_state),
});
let task_manager = TaskManager::new(TASK_MANAGER_TIMEOUT_S);
+3
View File
@@ -439,6 +439,8 @@ pub struct TopologyCacherDebug {
pub node_describe_caching_interval: Duration,
pub node_describe_batch_size: usize,
pub node_describe_allow_illegal_ips: bool,
}
impl Default for TopologyCacherDebug {
@@ -447,6 +449,7 @@ impl Default for TopologyCacherDebug {
caching_interval: DEFAULT_TOPOLOGY_CACHE_INTERVAL,
node_describe_caching_interval: DEFAULT_NODE_DESCRIBE_CACHE_INTERVAL,
node_describe_batch_size: DEFAULT_NODE_DESCRIBE_BATCH_SIZE,
node_describe_allow_illegal_ips: false,
}
}
}
+4 -1
View File
@@ -2,6 +2,7 @@
// SPDX-License-Identifier: GPL-3.0-only
use crate::circulating_supply_api::handlers::circulating_supply_routes;
use crate::ecash::api_routes::handlers::ecash_routes;
use crate::network::handlers::nym_network_routes;
use crate::node_status_api::handlers::node_status_routes;
use crate::nym_contract_cache::handlers::nym_contract_cache_routes;
@@ -16,7 +17,7 @@ use axum::response::Redirect;
use axum::routing::get;
use axum::Router;
use core::net::SocketAddr;
use nym_http_api_common::logging::logger;
use nym_http_api_common::middleware::logging::logger;
use tokio::net::TcpListener;
use tokio_util::sync::WaitForCancellationFutureOwned;
use tower_http::cors::CorsLayer;
@@ -62,6 +63,7 @@ impl RouterBuilder {
.nest("/network", nym_network_routes())
.nest("/api-status", status::handlers::api_status_routes())
.nest("/nym-nodes", nym_node_routes())
.nest("/ecash", ecash_routes())
.nest("/unstable", unstable_routes()), // CORS layer needs to be "outside" of routes
);
@@ -70,6 +72,7 @@ impl RouterBuilder {
}
}
#[allow(dead_code)]
pub(crate) fn nest(self, path: &str, router: Router<AppState>) -> Self {
Self {
unfinished_router: self.unfinished_router.nest(path, router),
+27 -1
View File
@@ -2,15 +2,18 @@
// SPDX-License-Identifier: GPL-3.0-only
use crate::circulating_supply_api::cache::CirculatingSupplyCache;
use crate::ecash::state::EcashState;
use crate::network::models::NetworkDetails;
use crate::node_describe_cache::DescribedNodes;
use crate::node_status_api::handlers::unstable;
use crate::node_status_api::models::AxumErrorResponse;
use crate::node_status_api::NodeStatusCache;
use crate::nym_contract_cache::cache::{CachedRewardedSet, NymContractCache};
use crate::status::ApiStatusState;
use crate::support::caching::cache::SharedCache;
use crate::support::caching::Cache;
use crate::support::storage;
use axum::extract::FromRef;
use nym_api_requests::models::{GatewayBondAnnotated, MixNodeBondAnnotated, NodeAnnotation};
use nym_mixnet_contract_common::NodeId;
use nym_task::TaskManager;
@@ -80,14 +83,37 @@ pub(crate) struct AppState {
pub(crate) described_nodes_cache: SharedCache<DescribedNodes>,
pub(crate) network_details: NetworkDetails,
pub(crate) node_info_cache: unstable::NodeInfoCache,
pub(crate) api_status: ApiStatusState,
// todo: refactor it into inner: Arc<EcashStateInner>
pub(crate) ecash_state: Arc<EcashState>,
}
#[derive(Clone, Default)]
impl FromRef<AppState> for ApiStatusState {
fn from_ref(app_state: &AppState) -> Self {
app_state.api_status.clone()
}
}
impl FromRef<AppState> for Arc<EcashState> {
fn from_ref(app_state: &AppState) -> Self {
app_state.ecash_state.clone()
}
}
#[derive(Clone)]
pub(crate) struct ForcedRefresh {
pub(crate) allow_all_ip_addresses: bool,
pub(crate) refreshes: Arc<RwLock<HashMap<NodeId, OffsetDateTime>>>,
}
impl ForcedRefresh {
pub(crate) fn new(allow_all_ip_addresses: bool) -> ForcedRefresh {
ForcedRefresh {
allow_all_ip_addresses,
refreshes: Arc::new(Default::default()),
}
}
pub(crate) async fn last_refreshed(&self, node_id: NodeId) -> Option<OffsetDateTime> {
self.refreshes.read().await.get(&node_id).copied()
}
+14 -16
View File
@@ -77,16 +77,6 @@ macro_rules! nyxd_query {
}};
}
macro_rules! nyxd_signing_shared {
($self:expr, $($op:tt)*) => {{
let guard = $self.inner.read().await;
match &*guard {
$crate::support::nyxd::ClientInner::Signing(client) => client.$($op)*,
$crate::support::nyxd::ClientInner::Query(_) => panic!("attempted to use a signing method on a query client"),
}
}};
}
macro_rules! nyxd_signing {
($self:expr, $($op:tt)*) => {{
let guard = $self.inner.write().await;
@@ -140,13 +130,19 @@ impl Client {
self.inner.read().await
}
pub(crate) async fn client_address(&self) -> AccountId {
nyxd_signing_shared!(self, address())
pub(crate) async fn client_address(&self) -> Option<AccountId> {
let guard = self.inner.read().await;
match &*guard {
ClientInner::Signing(client) => Some(client.address()),
ClientInner::Query(_) => None,
}
}
pub(crate) async fn balance<S: Into<String>>(&self, denom: S) -> Result<Coin, NyxdError> {
let address = self.client_address().await;
let denom = denom.into();
let Some(address) = self.client_address().await else {
return Ok(Coin::new(0, denom));
};
let balance = nyxd_query!(self, get_balance(&address, denom.clone()).await?);
match balance {
@@ -394,8 +390,10 @@ impl Client {
#[async_trait]
impl crate::ecash::client::Client for Client {
async fn address(&self) -> AccountId {
self.client_address().await
async fn address(&self) -> Result<AccountId, EcashError> {
self.client_address()
.await
.ok_or(EcashError::ChainSignerNotEnabled)
}
async fn dkg_contract_address(&self) -> Result<AccountId, EcashError> {
@@ -481,7 +479,7 @@ impl crate::ecash::client::Client for Client {
async fn get_self_registered_dealer_details(
&self,
) -> crate::ecash::error::Result<DealerDetailsResponse> {
let self_address = &self.address().await;
let self_address = &self.address().await?;
Ok(nyxd_query!(self, get_dealer_details(self_address).await?))
}
+96 -96
View File
@@ -7,6 +7,7 @@ use crate::support::storage::models::{
ActiveGateway, ActiveMixnode, GatewayDetails, HistoricalUptime, MixnodeDetails, NodeStatus,
RewardingReport, TestedGatewayStatus, TestedMixnodeStatus, TestingRoute,
};
use crate::support::storage::DbIdCache;
use nym_mixnet_contract_common::{EpochId, IdentityKey, NodeId};
use nym_types::monitoring::NodeResult;
use sqlx::FromRow;
@@ -51,24 +52,7 @@ impl AvgGatewayReliability {
// all SQL goes here
impl StorageManager {
pub(crate) async fn get_mixnode_mix_ids_by_identity(
&self,
identity: &str,
) -> Result<Vec<NodeId>, sqlx::Error> {
let ids = sqlx::query!(
r#"SELECT mix_id as "mix_id: NodeId" FROM mixnode_details WHERE identity_key = ?"#,
identity
)
.fetch_all(&self.connection_pool)
.await?
.into_iter()
.map(|row| row.mix_id)
.collect();
Ok(ids)
}
pub(crate) async fn get_all_avg_mix_reliability_in_last_24hr(
pub(super) async fn get_all_avg_mix_reliability_in_last_24hr(
&self,
end_ts_secs: i64,
) -> Result<Vec<AvgMixnodeReliability>, sqlx::Error> {
@@ -77,7 +61,7 @@ impl StorageManager {
.await
}
pub(crate) async fn get_all_avg_gateway_reliability_in_last_24hr(
pub(super) async fn get_all_avg_gateway_reliability_in_last_24hr(
&self,
end_ts_secs: i64,
) -> Result<Vec<AvgGatewayReliability>, sqlx::Error> {
@@ -86,7 +70,7 @@ impl StorageManager {
.await
}
pub(crate) async fn get_all_avg_mix_reliability_in_time_interval(
pub(super) async fn get_all_avg_mix_reliability_in_time_interval(
&self,
start_ts_secs: i64,
end_ts_secs: i64,
@@ -114,7 +98,7 @@ impl StorageManager {
Ok(result)
}
pub(crate) async fn get_all_avg_gateway_reliability_in_interval(
pub(super) async fn get_all_avg_gateway_reliability_in_interval(
&self,
start_ts_secs: i64,
end_ts_secs: i64,
@@ -147,7 +131,7 @@ impl StorageManager {
/// # Arguments
///
/// * `mix_id`: mix-id (as assigned by the smart contract) of the mixnode.
pub(crate) async fn get_mixnode_database_id(
pub(super) async fn get_mixnode_database_id(
&self,
mix_id: NodeId,
) -> Result<Option<i64>, sqlx::Error> {
@@ -159,7 +143,7 @@ impl StorageManager {
Ok(id)
}
pub(crate) async fn get_gateway_database_id(
pub(super) async fn get_gateway_database_id(
&self,
node_id: NodeId,
) -> Result<Option<i64>, sqlx::Error> {
@@ -172,7 +156,7 @@ impl StorageManager {
}
/// Tries to obtain row id of given gateway given its identity
pub(crate) async fn get_gateway_database_id_by_identity(
pub(super) async fn get_gateway_database_id_by_identity(
&self,
identity: &str,
) -> Result<Option<i64>, sqlx::Error> {
@@ -187,7 +171,7 @@ impl StorageManager {
Ok(id)
}
pub(crate) async fn get_gateway_node_id_from_identity_key(
pub(super) async fn get_gateway_node_id_from_identity_key(
&self,
identity: &str,
) -> Result<Option<NodeId>, sqlx::Error> {
@@ -202,7 +186,7 @@ impl StorageManager {
Ok(node_id)
}
pub(crate) async fn get_gateway_identity_key(
pub(super) async fn get_gateway_identity_key(
&self,
node_id: NodeId,
) -> Result<Option<IdentityKey>, sqlx::Error> {
@@ -222,7 +206,7 @@ impl StorageManager {
/// # Arguments
///
/// * `mix_id`: mix-id (as assigned by the smart contract) of the mixnode.
pub(crate) async fn get_mixnode_identity_key(
pub(super) async fn get_mixnode_identity_key(
&self,
mix_id: NodeId,
) -> Result<Option<IdentityKey>, sqlx::Error> {
@@ -244,7 +228,7 @@ impl StorageManager {
///
/// * `mix_id`: mix-id (as assigned by the smart contract) of the mixnode.
/// * `timestamp`: unix timestamp of the lower bound of the selection.
pub(crate) async fn get_mixnode_statuses_since(
pub(super) async fn get_mixnode_statuses_since(
&self,
mix_id: NodeId,
timestamp: i64,
@@ -272,7 +256,7 @@ impl StorageManager {
///
/// * `identity`: identity (base58-encoded public key) of the gateway.
/// * `timestamp`: unix timestamp of the lower bound of the selection.
pub(crate) async fn get_gateway_statuses_since(
pub(super) async fn get_gateway_statuses_since(
&self,
node_id: NodeId,
timestamp: i64,
@@ -298,7 +282,7 @@ impl StorageManager {
/// # Arguments
///
/// * `mix_id`: mix-id (as assigned by the smart contract) of the mixnode.
pub(crate) async fn get_mixnode_historical_uptimes(
pub(super) async fn get_mixnode_historical_uptimes(
&self,
mix_id: NodeId,
) -> Result<Vec<ApiHistoricalUptime>, sqlx::Error> {
@@ -336,7 +320,7 @@ impl StorageManager {
/// # Arguments
///
/// * `identity`: identity (base58-encoded public key) of the gateway.
pub(crate) async fn get_gateway_historical_uptimes(
pub(super) async fn get_gateway_historical_uptimes(
&self,
node_id: NodeId,
) -> Result<Vec<ApiHistoricalUptime>, sqlx::Error> {
@@ -369,7 +353,7 @@ impl StorageManager {
Ok(uptimes)
}
pub(crate) async fn get_historical_mix_uptime_on(
pub(super) async fn get_historical_mix_uptime_on(
&self,
contract_node_id: i64,
date: Date,
@@ -393,7 +377,7 @@ impl StorageManager {
.await
}
pub(crate) async fn get_historical_gateway_uptime_on(
pub(super) async fn get_historical_gateway_uptime_on(
&self,
contract_node_id: i64,
date: Date,
@@ -424,7 +408,7 @@ impl StorageManager {
///
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
pub(crate) async fn get_mixnode_statuses_by_database_id(
pub(super) async fn get_mixnode_statuses_by_database_id(
&self,
id: i64,
since: i64,
@@ -445,7 +429,7 @@ impl StorageManager {
.await
}
pub(crate) async fn get_mixnode_average_reliability_in_interval(
pub(super) async fn get_mixnode_average_reliability_in_interval(
&self,
id: i64,
start: i64,
@@ -507,7 +491,7 @@ impl StorageManager {
///
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
pub(crate) async fn get_gateway_statuses_by_database_id(
pub(super) async fn get_gateway_statuses_by_database_id(
&self,
id: i64,
since: i64,
@@ -534,27 +518,36 @@ impl StorageManager {
///
/// * `timestamp`: unix timestamp indicating when the measurements took place.
/// * `mixnode_results`: reliability results of each node that got tested.
pub(crate) async fn submit_mixnode_statuses(
pub(super) async fn submit_mixnode_statuses(
&self,
timestamp: i64,
mixnode_results: Vec<NodeResult>,
id_cache: &DbIdCache,
) -> Result<(), sqlx::Error> {
// insert it all in a transaction to make sure all nodes are updated at the same time
// (plus it's a nice guard against new nodes)
let mut tx = self.connection_pool.begin().await?;
for mixnode_result in mixnode_results {
let mixnode_id = sqlx::query!(
r#"
INSERT OR IGNORE INTO mixnode_details(mix_id, identity_key) VALUES (?, ?);
SELECT id FROM mixnode_details WHERE mix_id = ?;
"#,
mixnode_result.node_id,
mixnode_result.identity,
mixnode_result.node_id,
)
.fetch_one(&mut *tx)
.await?
.id;
let mixnode_id = match id_cache.mixnode_db_id(mixnode_result.node_id) {
Some(id) => id,
None => {
let mixnode_id = sqlx::query!(
r#"
INSERT OR IGNORE INTO mixnode_details(mix_id, identity_key) VALUES (?, ?);
SELECT id FROM mixnode_details WHERE mix_id = ?;
"#,
mixnode_result.node_id,
mixnode_result.identity,
mixnode_result.node_id,
)
.fetch_one(&mut *tx)
.await?
.id;
id_cache.set_mixnode_db_id(mixnode_result.node_id, mixnode_id);
mixnode_id
}
};
// insert the actual status
sqlx::query!(
@@ -573,7 +566,7 @@ impl StorageManager {
tx.commit().await
}
pub(crate) async fn submit_mixnode_statuses_v2(
pub(super) async fn submit_mixnode_statuses_v2(
&self,
mixnode_results: &[NodeResult],
) -> Result<(), sqlx::Error> {
@@ -620,10 +613,11 @@ impl StorageManager {
///
/// * `timestamp`: unix timestamp indicating when the measurements took place.
/// * `gateway_results`: reliability results of each node that got tested.
pub(crate) async fn submit_gateway_statuses(
pub(super) async fn submit_gateway_statuses(
&self,
timestamp: i64,
gateway_results: Vec<NodeResult>,
id_cache: &DbIdCache,
) -> Result<(), sqlx::Error> {
// insert it all in a transaction to make sure all nodes are updated at the same time
// (plus it's a nice guard against new nodes)
@@ -631,39 +625,45 @@ impl StorageManager {
for gateway_result in gateway_results {
// if gateway info doesn't exist, insert it and get its id
// same ID "problem" as described for mixnode insertion
let gateway_id = sqlx::query!(
r#"
INSERT OR IGNORE INTO gateway_details(node_id, identity) VALUES (?, ?);
SELECT id FROM gateway_details WHERE identity = ?;
"#,
gateway_result.node_id,
gateway_result.identity,
gateway_result.identity,
)
.fetch_one(&mut *tx)
.await?
.id;
let gateway_id = match id_cache.gateway_db_id(gateway_result.node_id) {
Some(id) => id,
None => {
let gateway_id = sqlx::query!(
r#"
INSERT OR IGNORE INTO gateway_details(node_id, identity) VALUES (?, ?);
SELECT id FROM gateway_details WHERE identity = ?;
"#,
gateway_result.node_id,
gateway_result.identity,
gateway_result.identity,
)
.fetch_one(&mut *tx)
.await?
.id;
id_cache.set_gateway_db_id(gateway_result.node_id, gateway_id);
gateway_id
}
};
// insert the actual status
sqlx::query!(
r#"
INSERT INTO gateway_status (gateway_details_id, reliability, timestamp) VALUES (?, ?, ?);
"#,
gateway_id,
gateway_result.reliability,
timestamp
)
.execute(&mut *tx)
.await?;
r#"
INSERT INTO gateway_status (gateway_details_id, reliability, timestamp) VALUES (?, ?, ?);
"#,
gateway_id,
gateway_result.reliability,
timestamp
)
.execute(&mut *tx)
.await?;
}
// finally commit the transaction
tx.commit().await
}
pub(crate) async fn submit_gateway_statuses_v2(
pub(super) async fn submit_gateway_statuses_v2(
&self,
gateway_results: &[NodeResult],
) -> Result<(), sqlx::Error> {
@@ -714,7 +714,7 @@ impl StorageManager {
/// # Arguments
///
/// * `testing_route`: test route used for this particular network monitor run.
pub(crate) async fn submit_testing_route_used(
pub(super) async fn submit_testing_route_used(
&self,
testing_route: TestingRoute,
) -> Result<(), sqlx::Error> {
@@ -742,7 +742,7 @@ impl StorageManager {
///
/// * `db_mixnode_id`: id (as saved in the database) of the mixnode.
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
pub(crate) async fn get_mixnode_testing_route_presence_count_since(
pub(super) async fn get_mixnode_testing_route_presence_count_since(
&self,
db_mixnode_id: i64,
since: i64,
@@ -781,7 +781,7 @@ impl StorageManager {
///
/// * `gateway_id`: id (as saved in the database) of the gateway.
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
pub(crate) async fn get_gateway_testing_route_presence_count_since(
pub(super) async fn get_gateway_testing_route_presence_count_since(
&self,
gateway_id: i64,
since: i64,
@@ -813,7 +813,7 @@ impl StorageManager {
}
/// Checks whether there are already any historical uptimes with this particular date.
pub(crate) async fn check_for_historical_uptime_existence(
pub(super) async fn check_for_historical_uptime_existence(
&self,
today_iso_8601: &str,
) -> Result<bool, sqlx::Error> {
@@ -833,7 +833,7 @@ impl StorageManager {
/// * `node_id`: id of the mixnode (as inserted in `mixnode_details_id` table).
/// * `date`: date associated with the uptime represented in ISO 8601, i.e. YYYY-MM-DD.
/// * `uptime`: the actual uptime of the node during the specified day.
pub(crate) async fn insert_mixnode_historical_uptime(
pub(super) async fn insert_mixnode_historical_uptime(
&self,
mix_id: i64,
date: &str,
@@ -855,7 +855,7 @@ impl StorageManager {
/// * `node_id`: id of the gateway (as inserted in `gateway_details_id` table).
/// * `date`: date associated with the uptime represented in ISO 8601, i.e. YYYY-MM-DD.
/// * `uptime`: the actual uptime of the node during the specified day.
pub(crate) async fn insert_gateway_historical_uptime(
pub(super) async fn insert_gateway_historical_uptime(
&self,
db_id: i64,
date: &str,
@@ -876,7 +876,7 @@ impl StorageManager {
/// # Arguments
///
/// * `timestamp`: unix timestamp at which the monitor test run has occurred
pub(crate) async fn insert_monitor_run(&self, timestamp: i64) -> Result<i64, sqlx::Error> {
pub(super) async fn insert_monitor_run(&self, timestamp: i64) -> Result<i64, sqlx::Error> {
let res = sqlx::query!("INSERT INTO monitor_run(timestamp) VALUES (?)", timestamp)
.execute(&self.connection_pool)
.await?;
@@ -889,7 +889,7 @@ impl StorageManager {
///
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
pub(crate) async fn get_monitor_runs_count(
pub(super) async fn get_monitor_runs_count(
&self,
since: i64,
until: i64,
@@ -911,7 +911,7 @@ impl StorageManager {
/// # Arguments
///
/// * `until`: timestamp specifying the purge cutoff.
pub(crate) async fn purge_old_mixnode_statuses(
pub(super) async fn purge_old_mixnode_statuses(
&self,
timestamp: i64,
) -> Result<(), sqlx::Error> {
@@ -927,7 +927,7 @@ impl StorageManager {
/// # Arguments
///
/// * `until`: timestamp specifying the purge cutoff.
pub(crate) async fn purge_old_gateway_statuses(
pub(super) async fn purge_old_gateway_statuses(
&self,
timestamp: i64,
) -> Result<(), sqlx::Error> {
@@ -944,7 +944,7 @@ impl StorageManager {
///
/// * `since`: indicates the lower bound timestamp for deciding whether given mixnode is active
/// * `until`: indicates the upper bound timestamp for deciding whether given mixnode is active
pub(crate) async fn get_all_active_mixnodes_in_interval(
pub(super) async fn get_all_active_mixnodes_in_interval(
&self,
since: i64,
until: i64,
@@ -978,7 +978,7 @@ impl StorageManager {
///
/// * `since`: indicates the lower bound timestamp for deciding whether given gateway is active
/// * `until`: indicates the upper bound timestamp for deciding whether given gateway is active
pub(crate) async fn get_all_active_gateways_in_interval(
pub(super) async fn get_all_active_gateways_in_interval(
&self,
since: i64,
until: i64,
@@ -1025,7 +1025,7 @@ impl StorageManager {
///
/// * `report`: report to insert into the database
#[allow(unused)]
pub(crate) async fn insert_rewarding_report(
pub(super) async fn insert_rewarding_report(
&self,
report: RewardingReport,
) -> Result<(), sqlx::Error> {
@@ -1044,7 +1044,7 @@ impl StorageManager {
}
#[allow(unused)]
pub(crate) async fn get_rewarding_report(
pub(super) async fn get_rewarding_report(
&self,
absolute_epoch_id: EpochId,
) -> Result<Option<RewardingReport>, sqlx::Error> {
@@ -1069,7 +1069,7 @@ impl StorageManager {
///
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
pub(crate) async fn get_all_active_mixnodes_statuses_in_interval(
pub(super) async fn get_all_active_mixnodes_statuses_in_interval(
&self,
since: i64,
until: i64,
@@ -1102,7 +1102,7 @@ impl StorageManager {
///
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
pub(crate) async fn get_all_active_gateways_statuses_in_interval(
pub(super) async fn get_all_active_gateways_statuses_in_interval(
&self,
since: i64,
until: i64,
@@ -1129,7 +1129,7 @@ impl StorageManager {
Ok(active_day_statuses)
}
pub(crate) async fn get_mixnode_details_by_db_id(
pub(super) async fn get_mixnode_details_by_db_id(
&self,
id: i64,
) -> Result<Option<MixnodeDetails>, sqlx::Error> {
@@ -1142,7 +1142,7 @@ impl StorageManager {
.await
}
pub(crate) async fn get_gateway_details_by_db_id(
pub(super) async fn get_gateway_details_by_db_id(
&self,
id: i64,
) -> Result<Option<GatewayDetails>, sqlx::Error> {
@@ -1154,7 +1154,7 @@ impl StorageManager {
.await
}
pub(crate) async fn get_mixnode_statuses_count(&self, db_id: i64) -> Result<i32, sqlx::Error> {
pub(super) async fn get_mixnode_statuses_count(&self, db_id: i64) -> Result<i32, sqlx::Error> {
sqlx::query!(
r#"
SELECT COUNT(*) as count
@@ -1170,7 +1170,7 @@ impl StorageManager {
.map(|record| record.count)
}
pub(crate) async fn get_mixnode_statuses(
pub(super) async fn get_mixnode_statuses(
&self,
mix_id: NodeId,
limit: u32,
@@ -1206,7 +1206,7 @@ impl StorageManager {
.await
}
pub(crate) async fn get_gateway_statuses_count(&self, db_id: i64) -> Result<i32, sqlx::Error> {
pub(super) async fn get_gateway_statuses_count(&self, db_id: i64) -> Result<i32, sqlx::Error> {
sqlx::query!(
r#"
SELECT COUNT(*) as count
@@ -1222,7 +1222,7 @@ impl StorageManager {
.map(|record| record.count)
}
pub(crate) async fn get_gateway_statuses(
pub(super) async fn get_gateway_statuses(
&self,
gateway_identity: &str,
limit: u32,
+85 -28
View File
@@ -1,6 +1,7 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use self::manager::{AvgGatewayReliability, AvgMixnodeReliability};
use crate::network_monitor::test_route::TestRoute;
use crate::node_status_api::models::{
GatewayStatusReport, GatewayUptimeHistory, HistoricalUptime as ApiHistoricalUptime,
@@ -12,37 +13,71 @@ use crate::storage::models::{NodeStatus, TestingRoute};
use crate::support::storage::models::{
GatewayDetails, HistoricalUptime, MixnodeDetails, TestedGatewayStatus, TestedMixnodeStatus,
};
use dashmap::DashMap;
use nym_mixnet_contract_common::NodeId;
use nym_types::monitoring::NodeResult;
use sqlx::ConnectOptions;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use time::{Date, OffsetDateTime};
use tracing::log::LevelFilter;
use tracing::{error, info, warn};
use self::manager::{AvgGatewayReliability, AvgMixnodeReliability};
pub(crate) mod manager;
pub(crate) mod models;
pub(crate) mod runtime_migrations;
#[derive(Default)]
pub(crate) struct DbIdCache {
pub mixnodes_v1: DashMap<NodeId, i64>,
pub gateways_v1: DashMap<NodeId, i64>,
}
impl DbIdCache {
pub(crate) fn mixnode_db_id(&self, node_id: NodeId) -> Option<i64> {
self.mixnodes_v1.get(&node_id).map(|v| *v)
}
pub(crate) fn gateway_db_id(&self, node_id: NodeId) -> Option<i64> {
self.gateways_v1.get(&node_id).map(|v| *v)
}
pub(crate) fn set_mixnode_db_id(&self, node_id: NodeId, db_id: i64) {
self.mixnodes_v1.insert(node_id, db_id);
}
pub(crate) fn set_gateway_db_id(&self, node_id: NodeId, db_id: i64) {
self.gateways_v1.insert(node_id, db_id);
}
}
// note that clone here is fine as upon cloning the same underlying pool will be used
#[derive(Clone)]
pub(crate) struct NymApiStorage {
pub manager: StorageManager,
pub db_id_cache: Arc<DbIdCache>,
}
impl NymApiStorage {
pub async fn init<P: AsRef<Path>>(database_path: P) -> Result<Self, NymApiStorageError> {
// TODO: we can inject here more stuff based on our nym-api global config
// struct. Maybe different pool size or timeout intervals?
let opts = sqlx::sqlite::SqliteConnectOptions::new()
let connect_opts = sqlx::sqlite::SqliteConnectOptions::new()
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
.log_statements(LevelFilter::Trace)
.log_slow_statements(LevelFilter::Warn, Duration::from_millis(250));
// TODO: do we want auto_vacuum ?
let connection_pool = match sqlx::SqlitePool::connect_with(opts).await {
let pool_opts = sqlx::sqlite::SqlitePoolOptions::new()
.min_connections(5)
.max_connections(25)
.acquire_timeout(Duration::from_secs(60));
let connection_pool = match pool_opts.connect_with(connect_opts).await {
Ok(db) => db,
Err(err) => {
error!("Failed to connect to SQLx database: {err}");
@@ -59,32 +94,38 @@ impl NymApiStorage {
let storage = NymApiStorage {
manager: StorageManager { connection_pool },
db_id_cache: Arc::new(Default::default()),
};
Ok(storage)
}
#[allow(unused)]
pub(crate) async fn mix_identity_to_mix_ids(
pub(crate) async fn get_mixnode_database_id(
&self,
identity: &str,
) -> Result<Vec<NodeId>, NymApiStorageError> {
Ok(self
.manager
.get_mixnode_mix_ids_by_identity(identity)
.await?)
node_id: NodeId,
) -> Result<Option<i64>, NymApiStorageError> {
if let Some(cached) = self.db_id_cache.mixnode_db_id(node_id) {
return Ok(Some(cached));
}
if let Some(retrieved) = self.manager.get_mixnode_database_id(node_id).await? {
self.db_id_cache.set_mixnode_db_id(node_id, retrieved);
return Ok(Some(retrieved));
}
Ok(None)
}
#[allow(unused)]
pub(crate) async fn mix_identity_to_latest_mix_id(
pub(crate) async fn get_gateway_database_id(
&self,
identity: &str,
) -> Result<Option<NodeId>, NymApiStorageError> {
Ok(self
.mix_identity_to_mix_ids(identity)
.await?
.into_iter()
.max())
node_id: NodeId,
) -> Result<Option<i64>, NymApiStorageError> {
if let Some(cached) = self.db_id_cache.gateway_db_id(node_id) {
return Ok(Some(cached));
}
if let Some(retrieved) = self.manager.get_gateway_database_id(node_id).await? {
self.db_id_cache.set_gateway_db_id(node_id, retrieved);
return Ok(Some(retrieved));
}
Ok(None)
}
pub(crate) async fn get_all_avg_gateway_reliability_in_last_24hr(
@@ -576,7 +617,6 @@ impl NymApiStorage {
// we MUST have those entries in the database, otherwise the route wouldn't have been chosen
// in the first place
let layer1_mix_db_id = self
.manager
.get_mixnode_database_id(test_route.layer_one_mix().mix_id)
.await?
.ok_or_else(|| NymApiStorageError::DatabaseInconsistency {
@@ -584,7 +624,6 @@ impl NymApiStorage {
})?;
let layer2_mix_db_id = self
.manager
.get_mixnode_database_id(test_route.layer_two_mix().mix_id)
.await?
.ok_or_else(|| NymApiStorageError::DatabaseInconsistency {
@@ -592,7 +631,6 @@ impl NymApiStorage {
})?;
let layer3_mix_db_id = self
.manager
.get_mixnode_database_id(test_route.layer_three_mix().mix_id)
.await?
.ok_or_else(|| NymApiStorageError::DatabaseInconsistency {
@@ -600,7 +638,6 @@ impl NymApiStorage {
})?;
let gateway_db_id = self
.manager
.get_gateway_database_id(test_route.gateway().node_id)
.await?
.ok_or_else(|| NymApiStorageError::DatabaseInconsistency {
@@ -701,11 +738,11 @@ impl NymApiStorage {
let monitor_run_id = self.manager.insert_monitor_run(now).await?;
self.manager
.submit_mixnode_statuses(now, mixnode_results)
.submit_mixnode_statuses(now, mixnode_results, &self.db_id_cache)
.await?;
self.manager
.submit_gateway_statuses(now, gateway_results)
.submit_gateway_statuses(now, gateway_results, &self.db_id_cache)
.await?;
for test_route in test_routes {
@@ -715,6 +752,26 @@ impl NymApiStorage {
Ok(())
}
pub(crate) async fn submit_mixnode_statuses_v2(
&self,
mixnode_results: &[NodeResult],
) -> Result<(), NymApiStorageError> {
self.manager
.submit_mixnode_statuses_v2(mixnode_results)
.await?;
Ok(())
}
pub(crate) async fn submit_gateway_statuses_v2(
&self,
gateway_results: &[NodeResult],
) -> Result<(), NymApiStorageError> {
self.manager
.submit_gateway_statuses_v2(gateway_results)
.await?;
Ok(())
}
/// Obtains number of network monitor test runs that have occurred within the specified interval.
///
/// # Arguments
@@ -34,7 +34,6 @@ nym-serde-helpers = { path = "../../common/serde-helpers", features = ["bs58"] }
workspace = true
features = ["tokio"]
[features]
default = ["query-types"]
query-types = ["nym-http-api-common"]
@@ -268,6 +268,9 @@ pub struct WebhookTicketbookWalletSharesRequest {
pub struct TicketbookObtainQueryParams {
pub output: Option<Output>,
#[serde(default)]
pub skip_webhook: bool,
pub include_master_verification_key: bool,
pub include_coin_index_signatures: bool,
@@ -1,6 +1,6 @@
[package]
name = "nym-credential-proxy"
version = "0.1.3"
version = "0.1.6"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
@@ -48,6 +48,7 @@ nym-config = { path = "../../common/config" }
nym-crypto = { path = "../../common/crypto", features = ["asymmetric", "rand", "serde"] }
nym-credentials = { path = "../../common/credentials" }
nym-credentials-interface = { path = "../../common/credentials-interface" }
nym-ecash-contract-common = { path = "../../common/cosmwasm-smart-contracts/ecash-contract" }
nym-http-api-common = { path = "../../common/http-api-common", features = ["utoipa"] }
nym-validator-client = { path = "../../common/client-libs/validator-client" }
nym-network-defaults = { path = "../../common/network-defaults" }
@@ -30,6 +30,6 @@ RUN apt update && apt install -yy curl ca-certificates
WORKDIR /nym
COPY --from=builder /usr/src/nym/nym-credential-proxy/target/release/nym-credential-proxy ./
COPY --from=builder /usr/src/nym/target/release/nym-credential-proxy ./
ENTRYPOINT [ "/nym/nym-credential-proxy" ]
@@ -55,6 +55,15 @@ pub struct Cli {
)]
pub(crate) http_auth_token: String,
/// Specify the maximum number of deposits the credential proxy can make in a single transaction
/// (default: 32)
#[clap(
long,
env = "NYM_CREDENTIAL_PROXY_MAX_CONCURRENT_DEPOSITS",
default_value_t = 32
)]
pub(crate) max_concurrent_deposits: usize,
#[clap(long, env = "NYM_CREDENTIAL_PROXY_PERSISTENT_STORAGE_STORAGE")]
pub(crate) persistent_storage_path: Option<PathBuf>,
}
@@ -1,6 +1,7 @@
// Copyright 2024 Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::deposit_maker::{DepositRequest, DepositResponse};
use crate::error::VpnApiError;
use crate::http::state::ApiState;
use crate::storage::models::BlindedShares;
@@ -14,21 +15,48 @@ use nym_credentials::IssuanceTicketBook;
use nym_credentials_interface::Base58;
use nym_crypto::asymmetric::ed25519;
use nym_validator_client::ecash::BlindSignRequestBody;
use nym_validator_client::nyxd::contract_traits::EcashSigningClient;
use nym_validator_client::nyxd::cosmwasm_client::ToSingletonContractData;
use nym_validator_client::nyxd::Coin;
use rand::rngs::OsRng;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::sync::Mutex;
use tokio::time::timeout;
use tracing::{debug, error, info, instrument};
use tokio::sync::{oneshot, Mutex};
use tokio::time::{timeout, Instant};
use tracing::{debug, error, info, instrument, warn};
use uuid::Uuid;
// use the same type alias as our contract without importing the whole thing just for this single line
pub type NodeId = u64;
#[instrument(skip(state), ret, err(Display))]
async fn make_deposit(
state: &ApiState,
pub_key: ed25519::PublicKey,
deposit_amount: &Coin,
) -> Result<DepositResponse, VpnApiError> {
let start = Instant::now();
let (on_done_tx, on_done_rx) = oneshot::channel();
let request = DepositRequest::new(pub_key, deposit_amount, on_done_tx);
state.request_deposit(request).await;
let time_taken = start.elapsed();
let formatted = humantime::format_duration(time_taken);
let Ok(deposit_response) = on_done_rx.await else {
error!("failed to receive deposit response: the corresponding sender channel got dropped by the DepositMaker!");
return Err(VpnApiError::DepositFailure);
};
if time_taken > Duration::from_secs(20) {
warn!("attempting to resolve deposit request took {formatted}. perhaps the buffer is too small or the process/chain is overloaded?")
} else {
debug!("attempting to resolve deposit request took {formatted}")
}
deposit_response.ok_or(VpnApiError::DepositFailure)
}
#[instrument(
skip(state, request_data, request, requested_on),
fields(
@@ -59,25 +87,12 @@ pub(crate) async fn try_obtain_wallet_shares(
.await?;
let ecash_api_clients = state.ecash_clients(epoch).await?.clone();
let chain_write_permit = state.start_chain_tx().await;
let DepositResponse {
deposit_id,
tx_hash,
} = make_deposit(state, *ed25519_keypair.public_key(), &deposit_amount).await?;
info!("starting the deposit!");
// TODO: batch those up
// TODO: batch those up
let deposit_res = chain_write_permit
.make_ticketbook_deposit(
ed25519_keypair.public_key().to_base58_string(),
deposit_amount.clone(),
None,
)
.await?;
// explicitly drop it here so other tasks could start using it
drop(chain_write_permit);
let deposit_id = deposit_res.parse_singleton_u32_contract_data()?;
let tx_hash = deposit_res.transaction_hash;
info!(deposit_id = %deposit_id, tx_hash = %tx_hash, "deposit finished");
info!(deposit_id = %deposit_id, "deposit finished");
// store the deposit information so if we fail, we could perhaps still reuse it for another issuance
state
@@ -342,6 +357,7 @@ pub(crate) async fn try_obtain_blinded_ticketbook_async(
params: TicketbookObtainQueryParams,
pending: BlindedShares,
) {
let skip_webhook = params.skip_webhook;
if let Err(err) = try_obtain_blinded_ticketbook_async_inner(
&state,
request,
@@ -352,6 +368,11 @@ pub(crate) async fn try_obtain_blinded_ticketbook_async(
)
.await
{
if skip_webhook {
info!(uuid = %request,"the webhook is not going to be called for this request");
return;
}
// post to the webhook to notify of errors on this side
if let Err(webhook_err) = try_trigger_webhook_request_for_error(
&state,
@@ -0,0 +1,205 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::error::VpnApiError;
use crate::http::state::ChainClient;
use nym_crypto::asymmetric::ed25519;
use nym_ecash_contract_common::deposit::DepositId;
use nym_validator_client::nyxd::cosmwasm_client::ContractResponseData;
use nym_validator_client::nyxd::{Coin, Hash};
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
#[derive(Debug)]
pub(crate) struct DepositResponse {
pub tx_hash: Hash,
pub deposit_id: DepositId,
}
pub(crate) struct DepositRequest {
pubkey: ed25519::PublicKey,
deposit_amount: Coin,
on_done: oneshot::Sender<Option<DepositResponse>>,
}
impl DepositRequest {
pub(crate) fn new(
pubkey: ed25519::PublicKey,
deposit_amount: &Coin,
on_done: oneshot::Sender<Option<DepositResponse>>,
) -> Self {
DepositRequest {
pubkey,
deposit_amount: deposit_amount.clone(),
on_done,
}
}
}
pub(crate) type DepositRequestReceiver = mpsc::Receiver<DepositRequest>;
pub(crate) fn new_control_channels(
max_concurrent_deposits: usize,
) -> (DepositRequestSender, DepositRequestReceiver) {
let (tx, rx) = mpsc::channel(max_concurrent_deposits);
(tx.into(), rx)
}
#[derive(Debug, Clone)]
pub struct DepositRequestSender(mpsc::Sender<DepositRequest>);
impl From<mpsc::Sender<DepositRequest>> for DepositRequestSender {
fn from(inner: mpsc::Sender<DepositRequest>) -> Self {
DepositRequestSender(inner)
}
}
impl DepositRequestSender {
pub(crate) async fn request_deposit(&self, request: DepositRequest) {
if self.0.send(request).await.is_err() {
error!("failed to request deposit: the DepositMaker must have died!")
}
}
}
pub(crate) struct DepositMaker {
client: ChainClient,
max_concurrent_deposits: usize,
deposit_request_sender: DepositRequestSender,
deposit_request_receiver: DepositRequestReceiver,
short_sha: &'static str,
cancellation_token: CancellationToken,
}
impl DepositMaker {
pub(crate) fn new(
short_sha: &'static str,
client: ChainClient,
max_concurrent_deposits: usize,
cancellation_token: CancellationToken,
) -> Self {
let (deposit_request_sender, deposit_request_receiver) =
new_control_channels(max_concurrent_deposits);
DepositMaker {
client,
max_concurrent_deposits,
deposit_request_sender,
deposit_request_receiver,
short_sha,
cancellation_token,
}
}
pub(crate) fn deposit_request_sender(&self) -> DepositRequestSender {
self.deposit_request_sender.clone()
}
pub(crate) async fn process_deposit_requests(
&mut self,
requests: Vec<DepositRequest>,
) -> Result<(), VpnApiError> {
let chain_write_permit = self.client.start_chain_tx().await;
info!("starting deposits");
let mut contents = Vec::new();
let mut replies = Vec::new();
for request in requests {
// check if the channel is still open in case the receiver client has cancelled the request
if request.on_done.is_closed() {
warn!(
"the request for deposit from {} got cancelled",
request.pubkey
);
continue;
}
contents.push((request.pubkey.to_base58_string(), request.deposit_amount));
replies.push(request.on_done);
}
let deposits_res = chain_write_permit
.make_deposits(self.short_sha, contents)
.await;
let execute_res = match deposits_res {
Ok(res) => res,
Err(err) => {
// we have to let requesters know the deposit(s) failed
for reply in replies {
if reply.send(None).is_err() {
warn!("one of the deposit requesters has been terminated")
}
}
return Err(err);
}
};
let tx_hash = execute_res.transaction_hash;
info!("{} deposits made in transaction: {tx_hash}", replies.len());
let contract_data = match execute_res.to_contract_data() {
Ok(contract_data) => contract_data,
Err(err) => {
// that one is tricky. deposits technically got made, but we somehow failed to parse response,
// in this case terminate the proxy with 0 exit code so it wouldn't get automatically restarted
// because it requires some serious MANUAL intervention
error!("CRITICAL FAILURE: failed to parse out deposit information from the contract transaction. either the chain got upgraded and the schema changed or the ecash contract got changed! terminating the process. it has to be inspected manually. error was: {err}");
self.cancellation_token.cancel();
return Err(VpnApiError::DepositFailure);
}
};
if contract_data.len() != replies.len() {
// another critical failure, that one should be quite impossible and thus has to be manually inspected
error!("CRITICAL FAILURE: failed to parse out all deposit information from the contract transaction. got {} responses while we sent {} deposits! either the chain got upgraded and the schema changed or the ecash contract got changed! terminating the process. it has to be inspected manually", contract_data.len(), replies.len());
self.cancellation_token.cancel();
return Err(VpnApiError::DepositFailure);
}
for (reply_channel, response) in replies.into_iter().zip(contract_data) {
let response_index = response.message_index;
let deposit_id = match response.parse_singleton_u32_contract_data() {
Ok(deposit_id) => deposit_id,
Err(err) => {
// another impossibility
error!("CRITICAL FAILURE: failed to parse out deposit id out of the response at index {response_index}: {err}. either the chain got upgraded and the schema changed or the ecash contract got changed! terminating the process. it has to be inspected manually");
self.cancellation_token.cancel();
return Err(VpnApiError::DepositFailure);
}
};
if reply_channel
.send(Some(DepositResponse {
deposit_id,
tx_hash,
}))
.is_err()
{
warn!("one of the deposit requesters has been terminated. deposit {deposit_id} will remain unclaimed!");
// this shouldn't happen as the requester task shouldn't be killed, but it's not a critical failure
// we just lost some tokens, but it's not an undefined on-chain behaviour
}
}
Ok(())
}
pub async fn run_forever(mut self) {
info!("starting the deposit maker task");
loop {
let mut receive_buffer = Vec::with_capacity(self.max_concurrent_deposits);
tokio::select! {
_ = self.cancellation_token.cancelled() => {
break
}
received = self.deposit_request_receiver.recv_many(&mut receive_buffer, self.max_concurrent_deposits) => {
debug!("received {received} deposit requests");
if let Err(err) = self.process_deposit_requests(receive_buffer).await {
error!("failed to process received deposit requests: {err}")
}
}
}
}
}
}
@@ -115,6 +115,9 @@ pub enum VpnApiError {
#[error("timed out while attempting to obtain partial wallet from {client_repr}")]
EcashApiRequestTimeout { client_repr: String },
#[error("failed to create deposit")]
DepositFailure,
}
impl VpnApiError {
@@ -1,64 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use axum::{
extract::{ConnectInfo, Request},
http::{
header::{HOST, USER_AGENT},
HeaderValue,
},
middleware::Next,
response::IntoResponse,
};
use colored::*;
use std::net::SocketAddr;
use tokio::time::Instant;
use tracing::info;
/// Simple logger for requests
pub async fn logger(
ConnectInfo(addr): ConnectInfo<SocketAddr>,
req: Request,
next: Next,
) -> impl IntoResponse {
let method = req.method().to_string().green();
let uri = req.uri().to_string().blue();
let agent = header_map(
req.headers().get(USER_AGENT),
"Unknown User Agent".to_string(),
);
let host = header_map(req.headers().get(HOST), "Unknown Host".to_string());
let start = Instant::now();
let res = next.run(req).await;
let time_taken = start.elapsed();
let status = res.status();
let print_status = if status.is_client_error() || status.is_server_error() {
status.to_string().red()
} else if status.is_success() {
status.to_string().green()
} else {
status.to_string().yellow()
};
let taken = "time taken".bold();
let time_taken = match time_taken.as_millis() {
ms if ms > 500 => format!("{taken}: {}", format!("{ms}ms").red()),
ms if ms > 200 => format!("{taken}: {}", format!("{ms}ms").yellow()),
ms if ms > 50 => format!("{taken}: {}", format!("{ms}ms").bright_yellow()),
ms => format!("{taken}: {ms}ms"),
};
let agent_str = "agent".bold();
info!("[{addr} -> {host}] {method} '{uri}': {print_status} {time_taken} {agent_str}: {agent}");
res
}
fn header_map(header: Option<&HeaderValue>, msg: String) -> String {
header
.map(|x| x.to_str().unwrap_or(&msg).to_string())
.unwrap_or(msg)
}
@@ -1,5 +0,0 @@
// Copyright 2024 Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
pub mod auth;
pub mod logging;
@@ -10,7 +10,6 @@ use tokio_util::sync::CancellationToken;
use tracing::info;
pub mod helpers;
pub mod middleware;
pub mod router;
pub mod state;
pub mod types;
@@ -22,10 +21,15 @@ pub struct HttpServer {
}
impl HttpServer {
pub fn new(bind_address: SocketAddr, state: ApiState, auth_token: String) -> Self {
pub fn new(
bind_address: SocketAddr,
state: ApiState,
auth_token: String,
cancellation: CancellationToken,
) -> Self {
HttpServer {
bind_address,
cancellation: state.cancellation_token(),
cancellation,
router: build_router(state, auth_token),
}
}

Some files were not shown because too many files have changed in this diff Show More