Compare commits

...

14 Commits

Author SHA1 Message Date
Tommy Verrall 4d609a071f transaction records for payment watcher
- bearer token not implemented yet
- needs to be ran and tested
- maybe overkill passing two params to retrieve the records - maybe try just one
2025-02-25 13:56:53 +01:00
dynco-nym 9de5d7213a Another total_stake SQL fix (#5516) 2025-02-24 18:06:03 +01:00
dynco-nym 94eb362a71 Fix total_stake on SQL update (#5514) 2025-02-24 20:50:42 +05:30
dependabot[bot] 0f615f48f2 build(deps): bump the patch-updates group with 2 updates (#5505) 2025-02-24 13:33:20 +01:00
Bogdan-Ștefan Neacşu d511611641 Connection fd callback before actual connection (#5494) 2025-02-24 14:23:43 +02:00
Jędrzej Stuczyński 17d3ff2d77 feat: use ct_eq for checking bearer token (#5501) 2025-02-24 09:04:34 +00:00
dynco-nym dd3dcfa7fe Treat gateways as Nym Nodes (#5504)
* Generate GW moniker if missing

Beside that:
- clear up gw nomenclature
- adjust counting when legacy nodes are present in nym node APIs
- create utils module

* Store gatewy descriptions

* Clippy & version
2025-02-21 20:32:39 +01:00
dynco-nym 86ea2d23cb Update version in Cargo.toml (#5503) 2025-02-21 16:16:44 +01:00
dynco-nym 42a37442e8 Fix stats bug & remove HM caching (#5495)
* Fix stats bug & remove HM caching

* Use variable for better clarity

* Minor fixes
2025-02-21 16:05:26 +01:00
dynco-nym 6b24f081e1 Add extra args for the probe (#5499) 2025-02-21 12:14:37 +01:00
Jędrzej Stuczyński 6e5d0dac1b feature: allow nym-nodes to understand future version of sphinx packets (#5496)
* use updated sphinx crate

* updated outfox usage of keygen in tests

* use x25519 in outfox

* remove redundant constructor

* adjusted key convertion traits
2025-02-21 11:06:07 +00:00
mfahampshire 5f2740bf66 add vercel config file: turn off autodeploy on master (#5490) 2025-02-19 11:03:04 +00:00
Tommy Verrall ecb15034d3 Merge pull request #5489 from nymtech/fix/contracts-cargo-lock
fix: Cargo.lock for contracts
2025-02-19 11:41:30 +01:00
Fran Arbanas bd49c222a3 fix: Cargo.lock for contracts 2025-02-19 09:06:34 +01:00
69 changed files with 1758 additions and 1727 deletions
Generated
+664 -626
View File
File diff suppressed because it is too large Load Diff
+2 -2
View File
@@ -322,7 +322,7 @@ serde_with = "3.9.0"
serde_yaml = "0.9.25"
sha2 = "0.10.8"
si-scale = "0.2.3"
sphinx-packet = "0.1.1"
sphinx-packet = "0.3.1"
sqlx = "0.7.4"
strum = "0.26"
strum_macros = "0.26"
@@ -330,7 +330,7 @@ subtle-encoding = "0.5"
syn = "1"
sysinfo = "0.33.0"
tap = "1.0.1"
tar = "0.4.43"
tar = "0.4.44"
tempfile = "3.15"
thiserror = "2.0"
time = "0.3.37"
@@ -204,15 +204,15 @@ impl<C, St> GatewayClient<C, St> {
"Attemting to establish connection to gateway at: {}",
self.gateway_address
);
let (ws_stream, _) = connect_async(&self.gateway_address).await?;
let (ws_stream, _) = connect_async(
&self.gateway_address,
#[cfg(unix)]
self.connection_fd_callback.clone(),
)
.await?;
self.connection = SocketState::Available(Box::new(ws_stream));
#[cfg(unix)]
if let (Some(callback), Some(fd)) = (self.connection_fd_callback.as_ref(), self.ws_fd()) {
callback.as_ref()(fd);
}
Ok(())
}
@@ -1,6 +1,11 @@
use crate::error::GatewayClientError;
use nym_http_api_client::HickoryDnsResolver;
#[cfg(unix)]
use std::{
os::fd::{AsRawFd, RawFd},
sync::Arc,
};
use tokio::net::TcpStream;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tungstenite::handshake::client::Response;
@@ -11,7 +16,10 @@ use std::net::SocketAddr;
#[cfg(not(target_arch = "wasm32"))]
pub(crate) async fn connect_async(
endpoint: &str,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
) -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), GatewayClientError> {
use tokio::net::TcpSocket;
let resolver = HickoryDnsResolver::default();
let uri =
Url::parse(endpoint).map_err(|_| GatewayClientError::InvalidUrl(endpoint.to_owned()))?;
@@ -37,14 +45,41 @@ pub(crate) async fn connect_async(
}
};
let stream = TcpStream::connect(&sock_addrs[..]).await.map_err(|error| {
GatewayClientError::NetworkConnectionFailed {
address: endpoint.to_owned(),
source: error.into(),
let mut stream = Err(GatewayClientError::NoEndpointForConnection {
address: endpoint.to_owned(),
});
for sock_addr in sock_addrs {
let socket = if sock_addr.is_ipv4() {
TcpSocket::new_v4()
} else {
TcpSocket::new_v6()
}
})?;
.map_err(|err| GatewayClientError::NetworkConnectionFailed {
address: endpoint.to_owned(),
source: err.into(),
})?;
tokio_tungstenite::client_async_tls(endpoint, stream)
#[cfg(unix)]
if let Some(callback) = connection_fd_callback.as_ref() {
callback.as_ref()(socket.as_raw_fd());
}
match socket.connect(sock_addr).await {
Ok(s) => {
stream = Ok(s);
break;
}
Err(err) => {
stream = Err(GatewayClientError::NetworkConnectionFailed {
address: endpoint.to_owned(),
source: err.into(),
});
continue;
}
}
}
tokio_tungstenite::client_async_tls(endpoint, stream?)
.await
.map_err(|error| GatewayClientError::NetworkConnectionFailed {
address: endpoint.to_owned(),
@@ -43,6 +43,9 @@ pub enum GatewayClientError {
#[error("connection failed: {address}: {source}")]
NetworkConnectionFailed { address: String, source: WsError },
#[error("no socket address for endpoint: {address}")]
NoEndpointForConnection { address: String },
#[error("Invalid URL: {0}")]
InvalidUrl(String),
+2 -3
View File
@@ -37,11 +37,10 @@ nym-pemstore = { path = "../../common/pemstore", version = "0.3.0" }
rand_chacha = { workspace = true }
[features]
default = ["sphinx"]
default = []
aead = ["dep:aead", "aead/std", "aes-gcm-siv", "generic-array"]
serde = ["dep:serde", "serde_bytes", "ed25519-dalek/serde", "x25519-dalek/serde"]
asymmetric = ["x25519-dalek", "ed25519-dalek", "zeroize"]
hashing = ["blake3", "digest", "hkdf", "hmac", "generic-array", "sha2"]
stream_cipher = ["aes", "ctr", "cipher", "generic-array"]
sphinx = ["nym-sphinx-types/sphinx"]
outfox = ["nym-sphinx-types/outfox"]
sphinx = ["nym-sphinx-types/sphinx"]
+18 -100
View File
@@ -202,6 +202,18 @@ impl PemStorableKey for PublicKey {
}
}
impl From<x25519_dalek::PublicKey> for PublicKey {
fn from(public_key: x25519_dalek::PublicKey) -> Self {
PublicKey(public_key)
}
}
impl From<PublicKey> for x25519_dalek::PublicKey {
fn from(public_key: PublicKey) -> Self {
public_key.0
}
}
#[derive(Zeroize, ZeroizeOnDrop)]
pub struct PrivateKey(x25519_dalek::StaticSecret);
@@ -308,109 +320,15 @@ impl PemStorableKey for PrivateKey {
}
}
// compatibility with sphinx keys:
#[cfg(feature = "sphinx")]
impl From<PublicKey> for nym_sphinx_types::PublicKey {
fn from(key: PublicKey) -> Self {
nym_sphinx_types::PublicKey::from(key.to_bytes())
impl From<x25519_dalek::StaticSecret> for PrivateKey {
fn from(secret: x25519_dalek::StaticSecret) -> Self {
PrivateKey(secret)
}
}
#[cfg(feature = "sphinx")]
impl<'a> From<&'a PublicKey> for nym_sphinx_types::PublicKey {
fn from(key: &'a PublicKey) -> Self {
nym_sphinx_types::PublicKey::from((*key).to_bytes())
}
}
#[cfg(feature = "sphinx")]
impl From<nym_sphinx_types::PublicKey> for PublicKey {
fn from(pub_key: nym_sphinx_types::PublicKey) -> Self {
Self(x25519_dalek::PublicKey::from(*pub_key.as_bytes()))
}
}
#[cfg(feature = "sphinx")]
impl From<PrivateKey> for nym_sphinx_types::PrivateKey {
fn from(key: PrivateKey) -> Self {
nym_sphinx_types::PrivateKey::from(key.to_bytes())
}
}
#[cfg(feature = "sphinx")]
impl<'a> From<&'a PrivateKey> for nym_sphinx_types::PrivateKey {
fn from(key: &'a PrivateKey) -> Self {
nym_sphinx_types::PrivateKey::from(key.to_bytes())
}
}
#[cfg(feature = "sphinx")]
impl From<nym_sphinx_types::PrivateKey> for PrivateKey {
fn from(private_key: nym_sphinx_types::PrivateKey) -> Self {
let private_key_bytes = private_key.to_bytes();
assert_eq!(private_key_bytes.len(), PRIVATE_KEY_SIZE);
Self::from_bytes(&private_key_bytes).unwrap()
}
}
#[cfg(test)]
mod sphinx_key_conversion {
use super::*;
use rand_chacha::rand_core::SeedableRng;
use rand_chacha::ChaCha20Rng;
pub(super) fn test_rng() -> ChaCha20Rng {
let dummy_seed = [42u8; 32];
ChaCha20Rng::from_seed(dummy_seed)
}
const NUM_ITERATIONS: usize = 100;
#[test]
fn works_for_forward_conversion() {
let mut rng = test_rng();
for _ in 0..NUM_ITERATIONS {
let keys = KeyPair::new(&mut rng);
let private = &keys.private_key;
let public = &keys.public_key;
let dummy_remote = KeyPair::new(&mut rng);
let dh1 = private.diffie_hellman(&dummy_remote.public_key);
let public_bytes = public.to_bytes();
let sphinx_private: nym_sphinx_types::PrivateKey = private.into();
let recovered_private = PrivateKey::from(sphinx_private);
let dh2 = recovered_private.diffie_hellman(&dummy_remote.public_key);
let sphinx_public: nym_sphinx_types::PublicKey = public.into();
let recovered_public = PublicKey::from(sphinx_public);
assert_eq!(public_bytes, recovered_public.to_bytes());
// even though the byte representation of the private key changed, the resultant DH is the same
// which is what matters
assert_eq!(dh1, dh2);
}
}
#[test]
fn works_for_backward_conversion() {
for _ in 0..NUM_ITERATIONS {
let (sphinx_private, sphinx_public) = nym_sphinx_types::crypto::keygen();
let private_bytes = sphinx_private.to_bytes();
let public_bytes = sphinx_public.as_bytes();
let private: PrivateKey = sphinx_private.into();
let recovered_sphinx_private: nym_sphinx_types::PrivateKey = private.into();
let public: PublicKey = sphinx_public.into();
let recovered_sphinx_public: nym_sphinx_types::PublicKey = public.into();
assert_eq!(private_bytes, recovered_sphinx_private.to_bytes());
assert_eq!(public_bytes, recovered_sphinx_public.as_bytes());
}
impl AsRef<x25519_dalek::StaticSecret> for PrivateKey {
fn as_ref(&self) -> &x25519_dalek::StaticSecret {
&self.0
}
}
+1
View File
@@ -20,6 +20,7 @@ mime = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
serde_yaml = { workspace = true }
subtle.workspace = true
tower = { workspace = true }
tracing.workspace = true
utoipa = { workspace = true, optional = true }
@@ -7,6 +7,7 @@ use axum::{extract::Request, response::Response};
use futures::future::BoxFuture;
use std::sync::Arc;
use std::task::{Context, Poll};
use subtle::ConstantTimeEq;
use tower::{Layer, Service};
use tracing::{debug, instrument, trace};
use zeroize::Zeroizing;
@@ -76,7 +77,7 @@ impl<S> RequireAuth<S> {
return Err("`Authorization` header must contain non-empty `Bearer` token");
}
if self.bearer_token.as_str() != bearer_token {
if bool::from(self.bearer_token.as_bytes().ct_ne(bearer_token.as_bytes())) {
return Err("`Authorization` header does not contain the correct `Bearer` token");
}
-2
View File
@@ -48,12 +48,10 @@ features = ["sync"]
[features]
default = ["sphinx"]
sphinx = [
"nym-crypto/sphinx",
"nym-sphinx-params/sphinx",
"nym-sphinx-types/sphinx",
]
outfox = [
"nym-crypto/outfox",
"nym-sphinx-params/outfox",
"nym-sphinx-types/outfox",
]
+1 -1
View File
@@ -8,7 +8,7 @@ license = { workspace = true }
repository = { workspace = true }
[dependencies]
nym-crypto = { path = "../../crypto", features = ["asymmetric"] } # all addresses are expressed in terms on their crypto keys
nym-crypto = { path = "../../crypto", features = ["asymmetric", "sphinx"] } # all addresses are expressed in terms on their crypto keys
nym-sphinx-types = { path = "../types", features = ["sphinx"] } # we need to be able to refer to some types defined inside sphinx crate
serde = { workspace = true } # implementing serialization/deserialization for some types, like `Recipient`
thiserror = { workspace = true }
@@ -559,7 +559,7 @@ mod tests {
let mut address_bytes = [0; NODE_ADDRESS_LENGTH];
rng.fill_bytes(&mut address_bytes);
let dummy_private = PrivateKey::new_with_rng(rng);
let dummy_private = PrivateKey::random_from_rng(rng);
let pub_key = (&dummy_private).into();
Node {
address: NodeAddressBytes::from_bytes(address_bytes),
+14 -9
View File
@@ -130,28 +130,33 @@ impl Decoder for NymCodec {
mod packet_encoding {
use super::*;
use nym_sphinx_types::{
crypto, Delay as SphinxDelay, Destination, DestinationAddressBytes, Node, NodeAddressBytes,
DESTINATION_ADDRESS_LENGTH, IDENTIFIER_LENGTH, NODE_ADDRESS_LENGTH,
Delay as SphinxDelay, Destination, DestinationAddressBytes, Node, NodeAddressBytes,
PrivateKey, DESTINATION_ADDRESS_LENGTH, IDENTIFIER_LENGTH, NODE_ADDRESS_LENGTH,
};
fn random_pubkey() -> nym_sphinx_types::PublicKey {
let private_key = PrivateKey::random();
(&private_key).into()
}
fn make_valid_outfox_packet(size: PacketSize) -> NymPacket {
let (_, node1_pk) = crypto::keygen();
let node1_pk = random_pubkey();
let node1 = Node::new(
NodeAddressBytes::from_bytes([5u8; NODE_ADDRESS_LENGTH]),
node1_pk,
);
let (_, node2_pk) = crypto::keygen();
let node2_pk = random_pubkey();
let node2 = Node::new(
NodeAddressBytes::from_bytes([4u8; NODE_ADDRESS_LENGTH]),
node2_pk,
);
let (_, node3_pk) = crypto::keygen();
let node3_pk = random_pubkey();
let node3 = Node::new(
NodeAddressBytes::from_bytes([2u8; NODE_ADDRESS_LENGTH]),
node3_pk,
);
let (_, node4_pk) = crypto::keygen();
let node4_pk = random_pubkey();
let node4 = Node::new(
NodeAddressBytes::from_bytes([2u8; NODE_ADDRESS_LENGTH]),
node4_pk,
@@ -170,17 +175,17 @@ mod packet_encoding {
}
fn make_valid_sphinx_packet(size: PacketSize) -> NymPacket {
let (_, node1_pk) = crypto::keygen();
let node1_pk = random_pubkey();
let node1 = Node::new(
NodeAddressBytes::from_bytes([5u8; NODE_ADDRESS_LENGTH]),
node1_pk,
);
let (_, node2_pk) = crypto::keygen();
let node2_pk = random_pubkey();
let node2 = Node::new(
NodeAddressBytes::from_bytes([4u8; NODE_ADDRESS_LENGTH]),
node2_pk,
);
let (_, node3_pk) = crypto::keygen();
let node3_pk = random_pubkey();
let node3 = Node::new(
NodeAddressBytes::from_bytes([2u8; NODE_ADDRESS_LENGTH]),
node3_pk,
+82 -23
View File
@@ -4,8 +4,10 @@ use nym_sphinx_addressing::nodes::{NymNodeRoutingAddress, NymNodeRoutingAddressE
use nym_sphinx_params::{PacketSize, PacketType};
use nym_sphinx_types::{
Delay as SphinxDelay, DestinationAddressBytes, NodeAddressBytes, NymPacket, NymPacketError,
NymProcessedPacket, OutfoxError, PrivateKey, ProcessedPacket, SphinxError,
NymProcessedPacket, OutfoxError, PrivateKey, ProcessedPacketData, SphinxError,
Version as SphinxPacketVersion,
};
use std::fmt::Display;
use thiserror::Error;
use crate::packet::FramedNymPacket;
@@ -13,12 +15,38 @@ use nym_metrics::nanos;
use nym_sphinx_forwarding::packet::MixPacket;
#[derive(Debug)]
pub enum MixProcessingResult {
pub enum MixProcessingResultData {
/// Contains unwrapped data that should first get delayed before being sent to next hop.
ForwardHop(MixPacket, Option<SphinxDelay>),
ForwardHop {
packet: MixPacket,
delay: Option<SphinxDelay>,
},
/// Contains all data extracted out of the final hop packet that could be forwarded to the destination.
FinalHop(ProcessedFinalHop),
FinalHop { final_hop_data: ProcessedFinalHop },
}
#[derive(Debug, Copy, Clone)]
pub enum MixPacketVersion {
Outfox,
Sphinx(SphinxPacketVersion),
}
impl Display for MixPacketVersion {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
match self {
MixPacketVersion::Outfox => "outfox".fmt(f),
MixPacketVersion::Sphinx(sphinx_version) => {
write!(f, "sphinx-{}", sphinx_version.value())
}
}
}
}
#[derive(Debug)]
pub struct MixProcessingResult {
pub packet_version: MixPacketVersion,
pub processing_data: MixProcessingResultData,
}
type ForwardAck = MixPacket;
@@ -107,37 +135,63 @@ fn perform_final_processing(
) -> Result<MixProcessingResult, PacketProcessingError> {
match packet {
NymProcessedPacket::Sphinx(packet) => {
match packet {
ProcessedPacket::ForwardHop(packet, address, delay) => {
process_forward_hop(NymPacket::Sphinx(*packet), address, delay, packet_type)
}
let processing_data = match packet.data {
ProcessedPacketData::ForwardHop {
next_hop_packet,
next_hop_address,
delay,
} => process_forward_hop(
NymPacket::Sphinx(next_hop_packet),
next_hop_address,
delay,
packet_type,
),
// right now there's no use for the surb_id included in the header - probably it should get removed from the
// sphinx all together?
ProcessedPacket::FinalHop(destination, _, payload) => process_final_hop(
ProcessedPacketData::FinalHop {
destination,
identifier: _,
payload,
} => process_final_hop(
destination,
payload.recover_plaintext()?,
packet_size,
packet_type,
),
}
}?;
Ok(MixProcessingResult {
packet_version: MixPacketVersion::Sphinx(packet.version),
processing_data,
})
}
NymProcessedPacket::Outfox(packet) => {
let next_address = *packet.next_address();
let packet = packet.into_packet();
if packet.is_final_hop() {
process_final_hop(
let processing_data = process_final_hop(
DestinationAddressBytes::from_bytes(next_address),
packet.recover_plaintext()?.to_vec(),
packet_size,
packet_type,
)
)?;
Ok(MixProcessingResult {
packet_version: MixPacketVersion::Outfox,
processing_data,
})
} else {
let mix_packet = MixPacket::new(
let packet = MixPacket::new(
NymNodeRoutingAddress::try_from_bytes(&next_address)?,
NymPacket::Outfox(packet),
PacketType::Outfox,
);
Ok(MixProcessingResult::ForwardHop(mix_packet, None))
Ok(MixProcessingResult {
packet_version: MixPacketVersion::Outfox,
processing_data: MixProcessingResultData::ForwardHop {
packet,
delay: None,
},
})
}
}
}
@@ -148,14 +202,16 @@ fn process_final_hop(
payload: Vec<u8>,
packet_size: PacketSize,
packet_type: PacketType,
) -> Result<MixProcessingResult, PacketProcessingError> {
) -> Result<MixProcessingResultData, PacketProcessingError> {
let (forward_ack, message) = split_into_ack_and_message(payload, packet_size, packet_type)?;
Ok(MixProcessingResult::FinalHop(ProcessedFinalHop {
destination,
forward_ack,
message,
}))
Ok(MixProcessingResultData::FinalHop {
final_hop_data: ProcessedFinalHop {
destination,
forward_ack,
message,
},
})
}
fn split_into_ack_and_message(
@@ -211,11 +267,14 @@ fn process_forward_hop(
forward_address: NodeAddressBytes,
delay: SphinxDelay,
packet_type: PacketType,
) -> Result<MixProcessingResult, PacketProcessingError> {
) -> Result<MixProcessingResultData, PacketProcessingError> {
let next_hop_address = NymNodeRoutingAddress::try_from(forward_address)?;
let mix_packet = MixPacket::new(next_hop_address, packet, packet_type);
Ok(MixProcessingResult::ForwardHop(mix_packet, Some(delay)))
let packet = MixPacket::new(next_hop_address, packet, packet_type);
Ok(MixProcessingResultData::ForwardHop {
packet,
delay: Some(delay),
})
}
// TODO: what more could we realistically test here?
+2 -2
View File
@@ -16,5 +16,5 @@ nym-sphinx-types = { path = "../types" }
[features]
default = ["sphinx"]
sphinx = ["nym-crypto/sphinx", "nym-sphinx-types/outfox"]
outfox = ["nym-crypto/outfox", "nym-sphinx-types/outfox"]
sphinx = ["nym-sphinx-types/outfox"]
outfox = ["nym-sphinx-types/outfox"]
+17 -7
View File
@@ -1,14 +1,22 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::{array::TryFromSliceError, fmt};
use thiserror::Error;
#[cfg(feature = "outfox")]
use nym_outfox::packet::{OutfoxPacket, OutfoxProcessedPacket};
#[cfg(feature = "sphinx")]
use sphinx_packet::{SphinxPacket, SphinxPacketBuilder};
#[cfg(feature = "outfox")]
pub use nym_outfox::{
constants::MIN_PACKET_SIZE, constants::MIX_PARAMS_LEN, constants::OUTFOX_PACKET_OVERHEAD,
error::OutfoxError,
};
// re-exporting types and constants available in sphinx
#[cfg(feature = "outfox")]
use nym_outfox::packet::{OutfoxPacket, OutfoxProcessedPacket};
#[cfg(feature = "sphinx")]
pub use sphinx_packet::{
constants::{
@@ -21,12 +29,10 @@ pub use sphinx_packet::{
payload::{Payload, PAYLOAD_OVERHEAD_SIZE},
route::{Destination, DestinationAddressBytes, Node, NodeAddressBytes, SURBIdentifier},
surb::{SURBMaterial, SURB},
Error as SphinxError, ProcessedPacket,
version::Version,
version::UPDATED_LEGACY_VERSION,
Error as SphinxError, ProcessedPacket, ProcessedPacketData,
};
#[cfg(feature = "sphinx")]
use sphinx_packet::{SphinxPacket, SphinxPacketBuilder};
use std::{array::TryFromSliceError, fmt};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum NymPacketError {
@@ -85,8 +91,12 @@ impl NymPacket {
destination: &Destination,
delays: &[Delay],
) -> Result<NymPacket, NymPacketError> {
// FIXME:
// for now explicitly use the legacy version until sufficient number of nodes
// understand both variants
Ok(NymPacket::Sphinx(
SphinxPacketBuilder::new()
.with_version(UPDATED_LEGACY_VERSION)
.with_payload_size(size)
.build_packet(message, route, destination, delays)?,
))
+1 -1
View File
@@ -27,7 +27,7 @@ wasm-bindgen = { workspace = true, optional = true }
## internal
nym-config = { path = "../config" }
nym-crypto = { path = "../crypto", features = ["sphinx", "outfox"] }
nym-crypto = { path = "../crypto" }
nym-mixnet-contract-common = { path = "../cosmwasm-smart-contracts/mixnet-contract" }
nym-sphinx-addressing = { path = "../nymsphinx/addressing" }
nym-sphinx-types = { path = "../nymsphinx/types", features = [
+1 -1
View File
@@ -105,7 +105,7 @@ impl<'a> From<&'a RoutingNode> for SphinxNode {
.try_into()
.unwrap();
SphinxNode::new(node_address_bytes, (&node.sphinx_key).into())
SphinxNode::new(node_address_bytes, node.sphinx_key.into())
}
}
+150 -337
View File
File diff suppressed because it is too large Load Diff
+7
View File
@@ -0,0 +1,7 @@
{
"git": {
"deploymentEnabled": {
"master": false
}
}
}
@@ -3,7 +3,7 @@
[package]
name = "nym-node-status-agent"
version = "1.0.0-rc.1"
version = "1.0.0-rc.2"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
@@ -1,9 +1,9 @@
#!/bin/bash
set -eu
export ENVIRONMENT=${ENVIRONMENT:-"sandbox"}
export ENVIRONMENT=${ENVIRONMENT:-"mainnet"}
probe_git_ref="nym-vpn-core-v1.1.0"
probe_git_ref="nym-vpn-core-v1.3.2"
crate_root=$(dirname $(realpath "$0"))
monorepo_root=$(realpath "${crate_root}/../..")
@@ -21,6 +21,7 @@ export NODE_STATUS_AGENT_SERVER_ADDRESS="http://127.0.0.1"
export NODE_STATUS_AGENT_SERVER_PORT="8000"
export NODE_STATUS_AGENT_PROBE_PATH="$crate_root/nym-gateway-probe"
export NODE_STATUS_AGENT_AUTH_KEY="BjyC9SsHAZUzPRkQR4sPTvVrp4GgaquTh5YfSJksvvWT"
export NODE_STATUS_AGENT_PROBE_EXTRA_ARGS="netstack-download-timeout-sec=30,netstack-num-ping=2,netstack-send-timeout-sec=1,netstack-recv-timeout-sec=1"
workers=${1:-1}
echo "Running $workers workers in parallel"
@@ -54,7 +55,7 @@ function swarm() {
echo "All agents completed"
}
# copy_gw_probe
copy_gw_probe
build_agent
swarm $workers
@@ -35,6 +35,13 @@ pub(crate) enum Command {
/// path of binary to run
#[arg(long, env = "NODE_STATUS_AGENT_PROBE_PATH")]
probe_path: String,
#[arg(
long,
env = "NODE_STATUS_AGENT_PROBE_EXTRA_ARGS",
value_delimiter = ','
)]
probe_extra_args: Vec<String>,
},
GenerateKeypair {
@@ -51,11 +58,13 @@ impl Args {
server_port,
ns_api_auth_key,
probe_path,
probe_extra_args,
} => run_probe::run_probe(
server_address,
server_port.to_owned(),
ns_api_auth_key,
probe_path,
probe_extra_args,
)
.await
.inspect_err(|err| {
@@ -7,6 +7,7 @@ pub(crate) async fn run_probe(
server_port: u16,
ns_api_auth_key: &str,
probe_path: &str,
probe_extra_args: &Vec<String>,
) -> anyhow::Result<()> {
let auth_key = PrivateKey::from_base58_string(ns_api_auth_key)
.context("Couldn't parse auth key, exiting")?;
@@ -19,7 +20,7 @@ pub(crate) async fn run_probe(
tracing::info!("Probe version:\n{}", version);
if let Some(testrun) = ns_api_client.request_testrun().await? {
let log = probe.run_and_get_log(&Some(testrun.gateway_identity_key));
let log = probe.run_and_get_log(&Some(testrun.gateway_identity_key), probe_extra_args);
ns_api_client
.submit_results(testrun.testrun_id, log, testrun.assigned_at_utc)
@@ -29,7 +29,11 @@ impl GwProbe {
}
}
pub(crate) fn run_and_get_log(&self, gateway_key: &Option<String>) -> String {
pub(crate) fn run_and_get_log(
&self,
gateway_key: &Option<String>,
probe_extra_args: &Vec<String>,
) -> String {
let mut command = std::process::Command::new(&self.path);
command.stdout(std::process::Stdio::piped());
@@ -37,6 +41,16 @@ impl GwProbe {
command.arg("--gateway").arg(gateway_id);
}
tracing::info!("Extra args for the probe:");
for arg in probe_extra_args {
let mut split = arg.splitn(2, '=');
let name = split.next().unwrap_or_default();
let value = split.next().unwrap_or_default();
tracing::info!("{} {}", name, value);
command.arg(format!("--{name}")).arg(value);
}
match command.spawn() {
Ok(child) => {
if let Ok(output) = child.wait_with_output() {
@@ -3,7 +3,7 @@
[package]
name = "nym-node-status-api"
version = "1.0.0-rc.8"
version = "1.0.2"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
@@ -16,11 +16,13 @@ rust-version.workspace = true
ammonia = { workspace = true }
anyhow = { workspace = true }
axum = { workspace = true, features = ["tokio", "macros"] }
bip39 = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true, features = ["cargo", "derive", "env", "string"] }
cosmwasm-std = { workspace = true }
envy = { workspace = true }
futures-util = { workspace = true }
itertools = { workspace = true }
moka = { workspace = true, features = ["future"] }
nym-contracts-common = { path = "../../common/cosmwasm-smart-contracts/contracts-common" }
nym-bin-common = { path = "../../common/bin-common", features = ["models"] }
@@ -33,6 +35,8 @@ nym-statistics-common = { path = "../../common/statistics" }
nym-validator-client = { path = "../../common/client-libs/validator-client" }
nym-task = { path = "../../common/task" }
nym-node-requests = { path = "../../nym-node/nym-node-requests", features = ["openapi"] }
rand = { workspace = true }
rand_chacha = { workspace = true }
regex = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true, features = ["derive"] }
@@ -3,7 +3,7 @@
set -e
user_rust_log_preference=$RUST_LOG
export ENVIRONMENT=${ENVIRONMENT:-"sandbox"}
export ENVIRONMENT=${ENVIRONMENT:-"mainnet"}
export NYM_API_CLIENT_TIMEOUT=60
export EXPLORER_CLIENT_TIMEOUT=60
export NODE_STATUS_API_TESTRUN_REFRESH_INTERVAL=120
@@ -83,9 +83,6 @@ pub(crate) struct Cli {
env = "NYM_NODE_STATUS_API_MAX_AGENT_COUNT"
)]
pub(crate) max_agent_count: i64,
#[clap(long, default_value = "", env = "NYM_NODE_STATUS_API_HM_URL")]
pub(crate) hm_url: String,
}
fn parse_duration(arg: &str) -> Result<std::time::Duration, std::num::ParseIntError> {
@@ -2,7 +2,7 @@ use std::str::FromStr;
use crate::{
http::{self, models::SummaryHistory},
monitor::NumericalCheckedCast,
utils::NumericalCheckedCast,
};
use anyhow::Context;
use nym_contracts_common::Percent;
@@ -16,7 +16,7 @@ use strum_macros::{EnumString, FromRepr};
use time::{Date, OffsetDateTime};
use utoipa::ToSchema;
pub(crate) struct GatewayRecord {
pub(crate) struct GatewayInsertRecord {
pub(crate) identity_key: String,
pub(crate) bonded: bool,
pub(crate) self_described: String,
@@ -360,14 +360,24 @@ impl TryFrom<GatewaySessionsRecord> for http::models::SessionStats {
}
}
pub(crate) enum MixingNodeKind {
LegacyMixnode,
NymNode,
pub(crate) enum ScrapeNodeKind {
LegacyMixnode { mix_id: i64 },
MixingNymNode { node_id: i64 },
EntryExitNymNode { node_id: i64, identity_key: String },
}
impl ScrapeNodeKind {
pub(crate) fn node_id(&self) -> &i64 {
match self {
ScrapeNodeKind::LegacyMixnode { mix_id } => mix_id,
ScrapeNodeKind::MixingNymNode { node_id } => node_id,
ScrapeNodeKind::EntryExitNymNode { node_id, .. } => node_id,
}
}
}
pub(crate) struct ScraperNodeInfo {
pub node_id: i64,
pub node_kind: MixingNodeKind,
pub node_kind: ScrapeNodeKind,
pub hosts: Vec<String>,
pub http_api_port: i64,
}
@@ -390,6 +400,10 @@ impl ScraperNodeInfo {
urls
}
pub(crate) fn node_id(&self) -> &i64 {
self.node_kind.node_id()
}
}
#[derive(sqlx::Decode, Debug)]
@@ -1,6 +1,8 @@
use std::collections::HashSet;
use crate::{
db::{
models::{GatewayDto, GatewayRecord},
models::{GatewayDto, GatewayInsertRecord},
DbPool,
},
http::models::Gateway,
@@ -30,7 +32,7 @@ pub(crate) async fn select_gateway_identity(
pub(crate) async fn insert_gateways(
pool: &DbPool,
gateways: Vec<GatewayRecord>,
gateways: Vec<GatewayInsertRecord>,
) -> anyhow::Result<()> {
let mut db = pool.acquire().await?;
for record in gateways {
@@ -98,3 +100,21 @@ pub(crate) async fn get_all_gateways(pool: &DbPool) -> anyhow::Result<Vec<Gatewa
tracing::trace!("Fetched {} gateways from DB", items.len());
Ok(items)
}
pub(crate) async fn get_all_gateway_id_keys(pool: &DbPool) -> anyhow::Result<HashSet<String>> {
let mut conn = pool.acquire().await?;
let items = sqlx::query!(
r#"
SELECT gateway_identity_key
FROM gateways
WHERE bonded = true
"#
)
.fetch_all(&mut *conn)
.await?
.into_iter()
.map(|record| record.gateway_identity_key)
.collect::<HashSet<_>>();
Ok(items)
}
@@ -1,3 +1,5 @@
use std::collections::HashSet;
use futures_util::TryStreamExt;
use tracing::error;
@@ -83,8 +85,7 @@ pub(crate) async fn get_all_mixnodes(pool: &DbPool) -> anyhow::Result<Vec<Mixnod
Ok(items)
}
/// `offset` = slides our fixed-day period further into the past by N days
pub(crate) async fn get_daily_stats(pool: &DbPool, offset: i64) -> anyhow::Result<Vec<DailyStats>> {
pub(crate) async fn get_daily_stats(pool: &DbPool) -> anyhow::Result<Vec<DailyStats>> {
let mut conn = pool.acquire().await?;
let items = sqlx::query_as!(
DailyStats,
@@ -115,11 +116,8 @@ pub(crate) async fn get_daily_stats(pool: &DbPool, offset: i64) -> anyhow::Resul
WHERE nym_node_daily_mixing_stats.node_id IS NULL
)
GROUP BY date_utc
ORDER BY date_utc DESC
LIMIT 30
OFFSET ?
ORDER BY date_utc ASC
"#,
offset
)
.fetch(&mut *conn)
.try_collect::<Vec<DailyStats>>()
@@ -127,3 +125,21 @@ pub(crate) async fn get_daily_stats(pool: &DbPool, offset: i64) -> anyhow::Resul
Ok(items)
}
pub(crate) async fn get_all_mix_ids(pool: &DbPool) -> anyhow::Result<HashSet<i64>> {
let mut conn = pool.acquire().await?;
let items = sqlx::query!(
r#"
SELECT mix_id
FROM mixnodes
WHERE bonded = true
"#
)
.fetch_all(&mut *conn)
.await?
.into_iter()
.map(|record| record.mix_id)
.collect::<HashSet<_>>();
Ok(items)
}
@@ -8,13 +8,15 @@ pub(crate) mod scraper;
mod summary;
pub(crate) mod testruns;
pub(crate) use gateways::{get_all_gateways, insert_gateways, select_gateway_identity};
pub(crate) use gateways::{
get_all_gateway_id_keys, get_all_gateways, insert_gateways, select_gateway_identity,
};
pub(crate) use gateways_stats::{delete_old_records, get_sessions_stats, insert_session_records};
pub(crate) use misc::insert_summaries;
pub(crate) use mixnodes::{get_all_mixnodes, get_daily_stats, insert_mixnodes};
pub(crate) use mixnodes::{get_all_mix_ids, get_all_mixnodes, get_daily_stats, insert_mixnodes};
pub(crate) use nym_nodes::{get_nym_nodes, insert_nym_nodes};
pub(crate) use packet_stats::{
get_raw_node_stats, insert_daily_node_stats, insert_node_packet_stats,
};
pub(crate) use scraper::{get_mixing_nodes_for_scraping, insert_scraped_node_description};
pub(crate) use scraper::{get_nodes_for_scraping, insert_scraped_node_description};
pub(crate) use summary::{get_summary, get_summary_history};
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use anyhow::Context;
use futures_util::TryStreamExt;
use nym_validator_client::{client::NymNodeDetails, nym_api::SkimmedNode};
use tracing::instrument;
@@ -9,7 +10,7 @@ use crate::{
models::{NymNodeDto, NymNodeInsertRecord},
DbPool,
},
monitor::decimal_to_i64,
utils::decimal_to_i64,
};
pub(crate) async fn get_nym_nodes(pool: &DbPool) -> anyhow::Result<Vec<SkimmedNode>> {
@@ -100,7 +101,8 @@ pub(crate) async fn insert_nym_nodes(
record.last_updated_utc,
)
.execute(&mut *conn)
.await?;
.await
.with_context(|| format!("node_id={}", record.node_id))?;
}
Ok(())
@@ -1,27 +1,26 @@
use crate::db::{
models::{MixingNodeKind, NodeStats, ScraperNodeInfo},
models::{NodeStats, ScrapeNodeKind, ScraperNodeInfo},
DbPool,
};
use anyhow::Result;
pub(crate) async fn insert_node_packet_stats(
pool: &DbPool,
node_id: i64,
node_kind: &MixingNodeKind,
node_kind: &ScrapeNodeKind,
stats: &NodeStats,
timestamp_utc: i64,
) -> Result<()> {
let mut conn = pool.acquire().await?;
match node_kind {
MixingNodeKind::LegacyMixnode => {
ScrapeNodeKind::LegacyMixnode { mix_id } => {
sqlx::query!(
r#"
INSERT INTO mixnode_packet_stats_raw (
mix_id, timestamp_utc, packets_received, packets_sent, packets_dropped
) VALUES (?, ?, ?, ?, ?)
"#,
node_id,
mix_id,
timestamp_utc,
stats.packets_received,
stats.packets_sent,
@@ -30,7 +29,8 @@ pub(crate) async fn insert_node_packet_stats(
.execute(&mut *conn)
.await?;
}
MixingNodeKind::NymNode => {
ScrapeNodeKind::MixingNymNode { node_id }
| ScrapeNodeKind::EntryExitNymNode { node_id, .. } => {
sqlx::query!(
r#"
INSERT INTO nym_nodes_packet_stats_raw (
@@ -60,7 +60,7 @@ pub(crate) async fn get_raw_node_stats(
let packets = match node.node_kind {
// if no packets are found, it's fine to assume 0 because that's also
// SQL default value if none provided
MixingNodeKind::LegacyMixnode => {
ScrapeNodeKind::LegacyMixnode { mix_id } => {
sqlx::query_as!(
NodeStats,
r#"
@@ -73,12 +73,13 @@ pub(crate) async fn get_raw_node_stats(
ORDER BY timestamp_utc DESC
LIMIT 1 OFFSET 1
"#,
node.node_id
mix_id
)
.fetch_optional(&mut *conn)
.await?
}
MixingNodeKind::NymNode => {
ScrapeNodeKind::MixingNymNode { node_id }
| ScrapeNodeKind::EntryExitNymNode { node_id, .. } => {
sqlx::query_as!(
NodeStats,
r#"
@@ -91,7 +92,7 @@ pub(crate) async fn get_raw_node_stats(
ORDER BY timestamp_utc DESC
LIMIT 1 OFFSET 1
"#,
node.node_id
node_id
)
.fetch_optional(&mut *conn)
.await?
@@ -110,7 +111,7 @@ pub(crate) async fn insert_daily_node_stats(
let mut conn = pool.acquire().await?;
match node.node_kind {
MixingNodeKind::LegacyMixnode => {
ScrapeNodeKind::LegacyMixnode { mix_id } => {
let total_stake = sqlx::query_scalar!(
r#"
SELECT
@@ -118,7 +119,7 @@ pub(crate) async fn insert_daily_node_stats(
FROM mixnodes
WHERE mix_id = ?
"#,
node.node_id
mix_id
)
.fetch_one(&mut *conn)
.await?;
@@ -136,7 +137,7 @@ pub(crate) async fn insert_daily_node_stats(
packets_sent = mixnode_daily_stats.packets_sent + excluded.packets_sent,
packets_dropped = mixnode_daily_stats.packets_dropped + excluded.packets_dropped
"#,
node.node_id,
mix_id,
date_utc,
total_stake,
packets.packets_received,
@@ -146,7 +147,8 @@ pub(crate) async fn insert_daily_node_stats(
.execute(&mut *conn)
.await?;
}
MixingNodeKind::NymNode => {
ScrapeNodeKind::MixingNymNode { node_id }
| ScrapeNodeKind::EntryExitNymNode { node_id, .. } => {
let total_stake = sqlx::query_scalar!(
r#"
SELECT
@@ -154,7 +156,7 @@ pub(crate) async fn insert_daily_node_stats(
FROM nym_nodes
WHERE node_id = ?
"#,
node.node_id
node_id
)
.fetch_one(&mut *conn)
.await?;
@@ -167,12 +169,12 @@ pub(crate) async fn insert_daily_node_stats(
packets_sent, packets_dropped
) VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(node_id, date_utc) DO UPDATE SET
total_stake = nym_node_daily_mixing_stats.total_stake + excluded.total_stake,
total_stake = excluded.total_stake,
packets_received = nym_node_daily_mixing_stats.packets_received + excluded.packets_received,
packets_sent = nym_node_daily_mixing_stats.packets_sent + excluded.packets_sent,
packets_dropped = nym_node_daily_mixing_stats.packets_dropped + excluded.packets_dropped
"#,
node.node_id,
node_id,
date_utc,
total_stake,
packets.packets_received,
@@ -1,6 +1,6 @@
use crate::{
db::{
models::{MixingNodeKind, ScraperNodeInfo},
models::{ScrapeNodeKind, ScraperNodeInfo},
queries, DbPool,
},
mixnet_scraper::helpers::NodeDescriptionResponse,
@@ -8,16 +8,36 @@ use crate::{
use anyhow::Result;
use chrono::Utc;
pub(crate) async fn get_mixing_nodes_for_scraping(pool: &DbPool) -> Result<Vec<ScraperNodeInfo>> {
pub(crate) async fn get_nodes_for_scraping(pool: &DbPool) -> Result<Vec<ScraperNodeInfo>> {
let mut nodes_to_scrape = Vec::new();
let mixnode_ids = queries::get_all_mix_ids(pool).await?;
let gateway_keys = queries::get_all_gateway_id_keys(pool).await?;
let mut entry_exit_nodes = 0;
queries::get_nym_nodes(pool)
.await?
.into_iter()
.for_each(|node| {
// due to polyfilling, Nym nodes table might contain legacy mixnodes
// as well. Mark them as such here.
let node_kind = if mixnode_ids.contains(&node.node_id.into()) {
ScrapeNodeKind::LegacyMixnode {
mix_id: node.node_id.into(),
}
} else if gateway_keys.contains(&node.ed25519_identity_pubkey.to_base58_string()) {
entry_exit_nodes += 1;
ScrapeNodeKind::EntryExitNymNode {
node_id: node.node_id.into(),
identity_key: node.ed25519_identity_pubkey.to_base58_string(),
}
} else {
ScrapeNodeKind::MixingNymNode {
node_id: node.node_id.into(),
}
};
nodes_to_scrape.push(ScraperNodeInfo {
node_id: node.node_id.into(),
node_kind: MixingNodeKind::NymNode,
node_kind,
hosts: node
.ip_addresses
.into_iter()
@@ -27,7 +47,8 @@ pub(crate) async fn get_mixing_nodes_for_scraping(pool: &DbPool) -> Result<Vec<S
})
});
tracing::debug!("Fetched {} 🌟 nym nodes", nodes_to_scrape.len());
tracing::debug!("Fetched {} 🌟 total nym nodes", nodes_to_scrape.len());
tracing::debug!("Fetched {} 🚪 entry/exit nodes", entry_exit_nodes);
let mut conn = pool.acquire().await?;
let mixnodes = sqlx::query!(
@@ -41,7 +62,7 @@ pub(crate) async fn get_mixing_nodes_for_scraping(pool: &DbPool) -> Result<Vec<S
.await?;
drop(conn);
tracing::debug!("Fetched {} 🦖 mixnodes", nodes_to_scrape.len());
tracing::debug!("Fetched {} 🦖 mixnodes", mixnodes.len());
let mut duplicates = 0;
let mut legacy_not_in_nym_node_list = 0;
@@ -49,26 +70,22 @@ pub(crate) async fn get_mixing_nodes_for_scraping(pool: &DbPool) -> Result<Vec<S
for mixnode in mixnodes {
if nodes_to_scrape
.iter()
.all(|node| node.node_id != mixnode.node_id)
.all(|node| node.node_id() != &mixnode.node_id)
{
// in case polyfilling on Nym API gets removed, this part ensures
// mixnodes are added to the final list of nodes to scrape
nodes_to_scrape.push(ScraperNodeInfo {
node_kind: ScrapeNodeKind::LegacyMixnode {
mix_id: mixnode.node_id,
},
hosts: vec![mixnode.host],
http_api_port: mixnode.http_api_port,
});
legacy_not_in_nym_node_list += 1;
} else {
duplicates += 1;
}
// technically, mixnodes shouldn't be in nym_nodes table, but it's
// possible due to polyfilling on Nym API
if nodes_to_scrape
.iter()
.all(|node| node.node_id != mixnode.node_id)
{
nodes_to_scrape.push(ScraperNodeInfo {
node_id: mixnode.node_id,
node_kind: MixingNodeKind::LegacyMixnode,
hosts: vec![mixnode.host],
http_api_port: mixnode.http_api_port,
})
}
}
tracing::debug!(
"{}/{} legacy mixnodes already included in nym_node list",
@@ -85,19 +102,16 @@ pub(crate) async fn get_mixing_nodes_for_scraping(pool: &DbPool) -> Result<Vec<S
Ok(nodes_to_scrape)
}
// TODO: add stuff for gateways
pub(crate) async fn insert_scraped_node_description(
pool: &DbPool,
node_kind: &MixingNodeKind,
node_id: i64,
node_kind: &ScrapeNodeKind,
description: &NodeDescriptionResponse,
) -> Result<()> {
let timestamp = Utc::now().timestamp();
let mut conn = pool.acquire().await?;
match node_kind {
MixingNodeKind::LegacyMixnode => {
ScrapeNodeKind::LegacyMixnode { mix_id } => {
sqlx::query!(
r#"
INSERT INTO mixnode_description (
@@ -110,7 +124,7 @@ pub(crate) async fn insert_scraped_node_description(
details = excluded.details,
last_updated_utc = excluded.last_updated_utc
"#,
node_id,
mix_id,
description.moniker,
description.website,
description.security_contact,
@@ -120,7 +134,7 @@ pub(crate) async fn insert_scraped_node_description(
.execute(&mut *conn)
.await?;
}
MixingNodeKind::NymNode => {
ScrapeNodeKind::MixingNymNode { node_id } => {
sqlx::query!(
r#"
INSERT INTO nym_node_descriptions (
@@ -143,6 +157,34 @@ pub(crate) async fn insert_scraped_node_description(
.execute(&mut *conn)
.await?;
}
ScrapeNodeKind::EntryExitNymNode { identity_key, .. } => {
sqlx::query!(
r#"
INSERT INTO gateway_description (
gateway_identity_key,
moniker,
website,
security_contact,
details,
last_updated_utc
) VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT (gateway_identity_key) DO UPDATE SET
moniker = excluded.moniker,
website = excluded.website,
security_contact = excluded.security_contact,
details = excluded.details,
last_updated_utc = excluded.last_updated_utc
"#,
identity_key,
description.moniker,
description.website,
description.security_contact,
description.details,
timestamp,
)
.execute(&mut *conn)
.await?;
}
}
Ok(())
@@ -99,7 +99,10 @@ async fn get_stats(
Query(MixStatsQueryParams { offset }): Query<MixStatsQueryParams>,
State(state): State<AppState>,
) -> HttpResult<Json<Vec<DailyStats>>> {
let offset = offset.unwrap_or(0);
let offset: usize = offset
.unwrap_or(0)
.try_into()
.map_err(|_| HttpError::invalid_input("Offset must be non-negative"))?;
let last_30_days = state
.cache()
.get_mixnode_stats(state.db_pool(), offset)
@@ -17,18 +17,10 @@ pub(crate) async fn start_http_api(
nym_http_cache_ttl: u64,
agent_key_list: Vec<PublicKey>,
agent_max_count: i64,
hm_url: String,
) -> anyhow::Result<ShutdownHandles> {
let router_builder = RouterBuilder::with_default_routes();
let state = AppState::new(
db_pool,
nym_http_cache_ttl,
agent_key_list,
agent_max_count,
hm_url,
)
.await;
let state = AppState::new(db_pool, nym_http_cache_ttl, agent_key_list, agent_max_count).await;
let router = router_builder.with_state(state);
let bind_addr = format!("0.0.0.0:{}", http_port);
@@ -25,11 +25,10 @@ impl AppState {
cache_ttl: u64,
agent_key_list: Vec<PublicKey>,
agent_max_count: i64,
hm_url: String,
) -> Self {
Self {
db_pool,
cache: HttpCache::new(cache_ttl, hm_url).await,
cache: HttpCache::new(cache_ttl).await,
agent_key_list,
agent_max_count,
}
@@ -52,96 +51,14 @@ impl AppState {
}
}
#[derive(Debug, Clone)]
struct HistoricMixingStats {
historic_stats: Vec<DailyStats>,
}
impl HistoricMixingStats {
/// Collect historic stats only on initialization. From this point onwards,
/// service will collect its own stats
async fn init(hm_url: String) -> Self {
tracing::info!("Fetching historic mixnode stats from {}", hm_url);
let target_url = format!("{}/v2/mixnodes/stats", hm_url);
if let Ok(response) = reqwest::get(&target_url)
.await
.and_then(|res| res.error_for_status())
.inspect_err(|err| tracing::error!("Failed to fetch cache from HM: {}", err))
{
if let Ok(mut daily_stats) = response.json::<Vec<DailyStats>>().await {
// sorting required for seamless comparison later (descending, newest first)
daily_stats.sort_by(|left, right| right.date_utc.cmp(&left.date_utc));
tracing::info!(
"Successfully fetched {} historic entries from {}",
daily_stats.len(),
hm_url
);
return Self {
historic_stats: daily_stats,
};
}
};
tracing::warn!("Failed to get historic daily stats from {}", hm_url);
Self {
historic_stats: Vec::new(),
}
}
/// polyfill with historical data obtained from Harbour Master
fn merge_with_historic_stats(&self, mut new_stats: Vec<DailyStats>) -> Vec<DailyStats> {
// newest first
new_stats.sort_by(|left, right| right.date_utc.cmp(&left.date_utc));
// historic stats are only used for dates when we don't have new data
let oldest_date_in_new_stats = new_stats
.last()
.map(|day| day.date_utc.to_owned())
.unwrap_or(String::from("1900-01-01"));
// given 2 arrays
// index historic_stats new_stats
// 0 30-01 31-01
// 1 29-01 30-01
// 2 28-01
// ...
// N 01-01
// cutoff point would be at historic_stats[1]
// (first date smaller than oldest we've already got)
if let Some(cutoff) = self
.historic_stats
.iter()
.position(|elem| elem.date_utc < oldest_date_in_new_stats)
{
// missing data = (all historic data) - (however many days we already have)
let missing_data = self.historic_stats.iter().skip(cutoff).cloned();
// extend new data with missing days
tracing::debug!(
"Polyfilled with {} historic records from {:?} to {:?}",
missing_data.len(),
self.historic_stats.last(),
self.historic_stats.get(cutoff)
);
new_stats.extend(missing_data);
// oldest first
new_stats.into_iter().rev().collect::<Vec<_>>()
} else {
// if all historic data is older than what we've got, don't use it
new_stats
}
}
}
static GATEWAYS_LIST_KEY: &str = "gateways";
static MIXNODES_LIST_KEY: &str = "mixnodes";
static MIXSTATS_LIST_KEY: &str = "mixstats";
static SUMMARY_HISTORY_LIST_KEY: &str = "summary-history";
static SESSION_STATS_LIST_KEY: &str = "session-stats";
const MIXNODE_STATS_HISTORY_DAYS: usize = 30;
#[derive(Debug, Clone)]
pub(crate) struct HttpCache {
gateways: Cache<String, Arc<RwLock<Vec<Gateway>>>>,
@@ -149,11 +66,10 @@ pub(crate) struct HttpCache {
mixstats: Cache<String, Arc<RwLock<Vec<DailyStats>>>>,
history: Cache<String, Arc<RwLock<Vec<SummaryHistory>>>>,
session_stats: Cache<String, Arc<RwLock<Vec<SessionStats>>>>,
mixnode_historic_daily_stats: HistoricMixingStats,
}
impl HttpCache {
pub async fn new(ttl_seconds: u64, hm_url: String) -> Self {
pub async fn new(ttl_seconds: u64) -> Self {
HttpCache {
gateways: Cache::builder()
.max_capacity(2)
@@ -175,7 +91,6 @@ impl HttpCache {
.max_capacity(2)
.time_to_live(Duration::from_secs(ttl_seconds))
.build(),
mixnode_historic_daily_stats: HistoricMixingStats::init(hm_url).await,
}
}
@@ -285,26 +200,27 @@ impl HttpCache {
.await
}
pub async fn get_mixnode_stats(&self, db: &DbPool, offset: i64) -> Vec<DailyStats> {
match self.mixstats.get(MIXSTATS_LIST_KEY).await {
pub async fn get_mixnode_stats(&self, db: &DbPool, offset: usize) -> Vec<DailyStats> {
let mut stats = match self.mixstats.get(MIXSTATS_LIST_KEY).await {
Some(guard) => {
let read_lock = guard.read().await;
read_lock.to_vec()
}
None => {
let new_node_stats = crate::db::queries::get_daily_stats(db, offset)
let new_node_stats = crate::db::queries::get_daily_stats(db)
.await
.unwrap_or_default();
// for every day that's missing, fill it with cached historic data
let mut mixnode_stats = self
.mixnode_historic_daily_stats
.merge_with_historic_stats(new_node_stats);
mixnode_stats.truncate(30);
self.upsert_mixnode_stats(mixnode_stats.clone()).await;
mixnode_stats
.unwrap_or_default()
.into_iter()
.rev()
.collect::<Vec<_>>();
// cache result without offset
self.upsert_mixnode_stats(new_node_stats.clone()).await;
new_node_stats
}
}
};
stats.truncate(MIXNODE_STATS_HISTORY_DAYS + offset);
stats.into_iter().skip(offset).rev().collect()
}
pub async fn get_summary_history(&self, db: &DbPool) -> Vec<SummaryHistory> {
@@ -34,6 +34,8 @@ pub(crate) fn setup_tracing_logger() -> anyhow::Result<()> {
"tower_http",
"axum",
"html5ever",
"hickory_proto",
"hickory_resolver",
];
for crate_name in warn_crates {
filter = filter.add_directive(directive_checked(format!("{}=warn", crate_name))?);
@@ -10,6 +10,7 @@ mod mixnet_scraper;
mod monitor;
mod node_scraper;
mod testruns;
mod utils;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
@@ -66,7 +67,6 @@ async fn main() -> anyhow::Result<()> {
args.nym_http_cache_ttl,
agent_key_list.to_owned(),
args.max_agent_count,
args.hm_url,
)
.await
.expect("Failed to start server");
@@ -1,12 +1,15 @@
use crate::db::{
models::{NodeStats, ScraperNodeInfo},
queries::{
get_raw_node_stats, insert_daily_node_stats, insert_node_packet_stats,
insert_scraped_node_description,
use crate::{
db::{
models::{NodeStats, ScraperNodeInfo},
queries::{
get_raw_node_stats, insert_daily_node_stats, insert_node_packet_stats,
insert_scraped_node_description,
},
},
utils::generate_node_name,
};
use ammonia::Builder;
use anyhow::Result;
use anyhow::{anyhow, Result};
use chrono::{DateTime, Datelike, Utc};
use reqwest;
use serde::{Deserialize, Serialize};
@@ -80,22 +83,33 @@ pub fn build_client() -> Result<reqwest::Client> {
.map_err(|e| anyhow::anyhow!("Failed to build HTTP client: {}", e))
}
pub fn sanitize_description(description: NodeDescriptionResponse) -> NodeDescriptionResponse {
pub fn sanitize_description(
description: NodeDescriptionResponse,
node_id: i64,
) -> NodeDescriptionResponse {
let mut sanitizer = Builder::new();
sanitizer
.tags(std::collections::HashSet::new())
.generic_attributes(std::collections::HashSet::new())
.url_schemes(std::collections::HashSet::new());
const UNKNOWN: &str = "N/A";
let sanitize_field = |opt: Option<String>| -> Option<String> {
Some(
opt.filter(|s| !s.trim().is_empty())
.map_or_else(|| "N/A".to_string(), |s| sanitizer.clean(&s).to_string()),
.map_or_else(|| UNKNOWN.to_string(), |s| sanitizer.clean(&s).to_string()),
)
};
let mut moniker = sanitize_field(description.moniker);
if let Some(sanitized) = &moniker {
if sanitized == UNKNOWN {
moniker = Some(generate_node_name(node_id));
}
};
NodeDescriptionResponse {
moniker: sanitize_field(description.moniker),
moniker,
website: sanitize_field(description.website),
security_contact: sanitize_field(description.security_contact),
details: sanitize_field(description.details),
@@ -108,18 +122,26 @@ pub async fn scrape_and_store_description(pool: &SqlitePool, node: &ScraperNodeI
let mut description = None;
let mut error = None;
let mut tried_url_list = Vec::new();
for mut url in urls {
url = format!("{}{}", url.trim_end_matches('/'), DESCRIPTION_URL);
tried_url_list.push(url.clone());
match client.get(&url).send().await {
match client
.get(&url)
.send()
.await
// convert 404 and similar to error
.and_then(|res| res.error_for_status())
{
Ok(response) => {
if let Ok(desc) = response.json::<NodeDescriptionResponse>().await {
description = Some(desc);
break;
}
}
Err(e) => error = Some(e),
Err(e) => error = Some(anyhow!("{:?} ({})", tried_url_list, e)),
}
}
@@ -128,9 +150,8 @@ pub async fn scrape_and_store_description(pool: &SqlitePool, node: &ScraperNodeI
anyhow::anyhow!("Failed to fetch description from any URL: {}", err_msg)
})?;
let sanitized_description = sanitize_description(description);
insert_scraped_node_description(pool, &node.node_kind, node.node_id, &sanitized_description)
.await?;
let sanitized_description = sanitize_description(description, *node.node_id());
insert_scraped_node_description(pool, &node.node_kind, &sanitized_description).await?;
Ok(())
}
@@ -144,9 +165,11 @@ pub async fn scrape_and_store_packet_stats(
let mut stats = None;
let mut error = None;
let mut tried_url_list = Vec::new();
for mut url in urls {
url = format!("{}{}", url.trim_end_matches('/'), PACKET_STATS_URL);
tried_url_list.push(url.clone());
match client.get(&url).send().await {
Ok(response) => {
@@ -155,18 +178,18 @@ pub async fn scrape_and_store_packet_stats(
break;
}
}
Err(e) => error = Some(e),
Err(e) => error = Some(anyhow!("{:?} ({})", tried_url_list, e)),
}
}
let stats = stats.ok_or_else(|| {
let err_msg = error.map_or_else(|| "Unknown error".to_string(), |e| e.to_string());
anyhow::anyhow!("Failed to fetch stats from any URL: {}", err_msg)
anyhow::anyhow!("Failed to fetch description from any URL: {}", err_msg)
})?;
let timestamp = Utc::now();
let timestamp_utc = timestamp.timestamp();
insert_node_packet_stats(pool, node.node_id, &node.node_kind, &stats, timestamp_utc).await?;
insert_node_packet_stats(pool, &node.node_kind, &stats, timestamp_utc).await?;
// Update daily stats
update_daily_stats(pool, node, timestamp, &stats).await?;
@@ -8,7 +8,7 @@ use sqlx::SqlitePool;
use tracing::{debug, error, instrument, warn};
use crate::db::models::ScraperNodeInfo;
use crate::db::queries::get_mixing_nodes_for_scraping;
use crate::db::queries::get_nodes_for_scraping;
const DESCRIPTION_SCRAPE_INTERVAL: Duration = Duration::from_secs(60 * 60 * 4);
const PACKET_SCRAPE_INTERVAL: Duration = Duration::from_secs(60 * 60);
@@ -74,7 +74,7 @@ impl Scraper {
pool: &SqlitePool,
queue: Arc<Mutex<Vec<ScraperNodeInfo>>>,
) -> Result<()> {
let nodes = get_mixing_nodes_for_scraping(pool).await?;
let nodes = get_nodes_for_scraping(pool).await?;
if let Ok(mut queue_lock) = queue.lock() {
queue_lock.extend(nodes);
} else {
@@ -82,7 +82,7 @@ impl Scraper {
return Ok(());
}
Self::process_description_queue(pool, queue).await?;
Self::process_description_queue(pool, queue).await;
Ok(())
}
@@ -91,7 +91,7 @@ impl Scraper {
pool: &SqlitePool,
queue: Arc<Mutex<Vec<ScraperNodeInfo>>>,
) -> Result<()> {
let nodes = get_mixing_nodes_for_scraping(pool).await?;
let nodes = get_nodes_for_scraping(pool).await?;
tracing::info!("Querying {} mixing nodes", nodes.len());
if let Ok(mut queue_lock) = queue.lock() {
queue_lock.extend(nodes);
@@ -100,14 +100,11 @@ impl Scraper {
return Ok(());
}
Self::process_packet_queue(pool, queue).await?;
Self::process_packet_queue(pool, queue).await;
Ok(())
}
async fn process_description_queue(
pool: &SqlitePool,
queue: Arc<Mutex<Vec<ScraperNodeInfo>>>,
) -> Result<()> {
async fn process_description_queue(pool: &SqlitePool, queue: Arc<Mutex<Vec<ScraperNodeInfo>>>) {
loop {
let running_tasks = TASK_COUNTER.load(Ordering::Relaxed);
@@ -132,12 +129,15 @@ impl Scraper {
tokio::spawn(async move {
match scrape_and_store_description(&pool, &node).await {
Ok(_) => debug!(
"✅ Description task #{} for node {} complete",
task_id, node.node_id
"📝 ✅ Description task #{} for node {} complete",
task_id,
node.node_id()
),
Err(e) => debug!(
"❌ Description task #{} for node {} failed: {}",
task_id, node.node_id, e
"📝 ❌ Description task #{} for node {} failed: {}",
task_id,
node.node_id(),
e
),
}
TASK_COUNTER.fetch_sub(1, Ordering::Relaxed);
@@ -146,13 +146,9 @@ impl Scraper {
tokio::time::sleep(QUEUE_CHECK_INTERVAL).await;
}
}
Ok(())
}
async fn process_packet_queue(
pool: &SqlitePool,
queue: Arc<Mutex<Vec<ScraperNodeInfo>>>,
) -> Result<()> {
async fn process_packet_queue(pool: &SqlitePool, queue: Arc<Mutex<Vec<ScraperNodeInfo>>>) {
loop {
let running_tasks = TASK_COUNTER.load(Ordering::Relaxed);
@@ -177,12 +173,15 @@ impl Scraper {
tokio::spawn(async move {
match scrape_and_store_packet_stats(&pool, &node).await {
Ok(_) => debug!(
"✅ Packet stats task #{} for node {} complete",
task_id, node.node_id
"📊 ✅ Packet stats task #{} for node {} complete",
task_id,
node.node_id()
),
Err(e) => debug!(
"❌ Packet stats task #{} for node {} failed: {}",
task_id, node.node_id, e
"📊 ❌ Packet stats task #{} for node {} failed: {}",
task_id,
node.node_id(),
e
),
}
TASK_COUNTER.fetch_sub(1, Ordering::Relaxed);
@@ -191,6 +190,5 @@ impl Scraper {
tokio::time::sleep(QUEUE_CHECK_INTERVAL).await;
}
}
Ok(())
}
}
@@ -1,14 +1,14 @@
#![allow(deprecated)]
use crate::db::models::{
gateway, mixnode, GatewayRecord, MixnodeRecord, NetworkSummary, ASSIGNED_ENTRY_COUNT,
gateway, mixnode, GatewayInsertRecord, MixnodeRecord, NetworkSummary, ASSIGNED_ENTRY_COUNT,
ASSIGNED_EXIT_COUNT, ASSIGNED_MIXING_COUNT, GATEWAYS_BONDED_COUNT, GATEWAYS_HISTORICAL_COUNT,
MIXNODES_HISTORICAL_COUNT, MIXNODES_LEGACY_COUNT, NYMNODES_DESCRIBED_COUNT, NYMNODE_COUNT,
};
use crate::db::{queries, DbPool};
use crate::monitor::geodata::{Location, NodeGeoData};
use crate::utils::{decimal_to_i64, LogError, NumericalCheckedCast};
use anyhow::anyhow;
use cosmwasm_std::Decimal;
use moka::future::Cache;
use nym_network_defaults::NymNetworkDetails;
use nym_validator_client::client::{NodeId, NymApiClientExt};
@@ -29,7 +29,6 @@ pub(crate) use geodata::IpInfoClient;
mod geodata;
// TODO dz should be configurable
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60);
static DELEGATION_PROGRAM_WALLET: &str = "n1rnxpdpx3kldygsklfft0gech7fhfcux4zst5lw";
@@ -109,7 +108,11 @@ impl Monitor {
let gateways = described_nodes
.iter()
.filter(|node| node.description.declared_role.entry)
.filter(|node| {
node.description.declared_role.entry
|| node.description.declared_role.exit_ipr
|| node.description.declared_role.exit_nr
})
.collect::<Vec<_>>();
let bonded_node_info = api_client
@@ -120,12 +123,18 @@ impl Monitor {
// for faster reads
.collect::<HashMap<_, _>>();
tracing::info!("🟣 bonded_nodes: {}", bonded_node_info.len());
let nym_nodes = api_client
.get_all_basic_nodes()
.await
.log_error("get_all_basic_nodes")?;
queries::insert_nym_nodes(&self.db_pool, nym_nodes.clone(), &bonded_node_info).await?;
queries::insert_nym_nodes(&self.db_pool, nym_nodes.clone(), &bonded_node_info)
.await
.map(|_| {
tracing::debug!("{} nym nodes written to DB!", nym_nodes.len());
})?;
let mut gateway_geodata = Vec::new();
for gateway in gateways.iter() {
@@ -198,10 +207,11 @@ impl Monitor {
let gateway_records = self.prepare_gateway_data(&gateways, gateway_geodata, &nym_nodes)?;
let pool = self.db_pool.clone();
let gateways_count = gateway_records.len();
queries::insert_gateways(&pool, gateway_records)
.await
.map(|_| {
tracing::debug!("Gateway info written to DB!");
tracing::debug!("{} gateway records written to DB!", gateways_count);
})?;
let mixnode_records = self.prepare_mixnode_data(
@@ -209,10 +219,11 @@ impl Monitor {
mixnodes_described,
delegation_program_members,
)?;
let mixnodes_count = mixnode_records.len();
queries::insert_mixnodes(&pool, mixnode_records)
.await
.map(|_| {
tracing::debug!("Mixnode info written to DB!");
tracing::debug!("{} mixnode info written to DB!", mixnodes_count);
})?;
let (all_historical_gateways, all_historical_mixnodes) = calculate_stats(&pool).await?;
@@ -299,13 +310,13 @@ impl Monitor {
fn prepare_gateway_data(
&self,
gateways: &[&NymNodeDescription],
described_gateways: &[&NymNodeDescription],
gateway_geodata: Vec<NodeGeoData>,
skimmed_gateways: &[SkimmedNode],
) -> anyhow::Result<Vec<GatewayRecord>> {
) -> anyhow::Result<Vec<GatewayInsertRecord>> {
let mut gateway_records = Vec::new();
for gateway in gateways {
for gateway in described_gateways {
let identity_key = gateway.ed25519_identity_key().to_base58_string();
let bonded = true;
let last_updated_utc = chrono::offset::Utc::now().timestamp();
@@ -329,7 +340,7 @@ impl Monitor {
.unwrap_or_default()
.round_to_integer();
gateway_records.push(GatewayRecord {
gateway_records.push(GatewayInsertRecord {
identity_key: identity_key.to_owned(),
bonded,
self_described,
@@ -400,33 +411,6 @@ impl Monitor {
}
}
// TODO dz is there a common monorepo place this can be put?
pub trait NumericalCheckedCast<T>
where
T: TryFrom<Self>,
<T as TryFrom<Self>>::Error: std::error::Error,
Self: std::fmt::Display + Copy,
{
fn cast_checked(self) -> anyhow::Result<T> {
T::try_from(self).map_err(|e| {
anyhow::anyhow!(
"Couldn't cast {} to {}: {}",
self,
std::any::type_name::<T>(),
e
)
})
}
}
impl<T, U> NumericalCheckedCast<U> for T
where
U: TryFrom<T>,
<U as TryFrom<T>>::Error: std::error::Error,
T: std::fmt::Display + Copy,
{
}
async fn calculate_stats(pool: &DbPool) -> anyhow::Result<(usize, usize)> {
let mut conn = pool.acquire().await?;
@@ -464,39 +448,3 @@ async fn get_delegation_program_details(
Ok(mix_ids)
}
pub(crate) fn decimal_to_i64(decimal: Decimal) -> i64 {
// Convert the underlying Uint128 to a u128
let atomics = decimal.atomics().u128();
let precision = 1_000_000_000_000_000_000u128;
// Get the fractional part
let fractional = atomics % precision;
// Get the integer part
let integer = atomics / precision;
// Combine them into a float
let float_value = integer as f64 + (fractional as f64 / 1_000_000_000_000_000_000_f64);
// Limit to 6 decimal places
let rounded_value = (float_value * 1_000_000.0).round() / 1_000_000.0;
rounded_value as i64
}
trait LogError<T, E> {
fn log_error(self, msg: &str) -> Result<T, E>;
}
impl<T, E> LogError<T, E> for anyhow::Result<T, E>
where
E: std::error::Error,
{
fn log_error(self, msg: &str) -> Result<T, E> {
if let Err(e) = &self {
tracing::error!("[{msg}]:\t{e}");
}
self
}
}
@@ -17,15 +17,14 @@ use tracing::instrument;
mod error;
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60);
const REFRESH_INTERVAL: Duration = Duration::from_secs(60 * 60 * 6); //6h, data only update once a day
const REFRESH_INTERVAL: Duration = Duration::from_secs(60 * 60 * 6);
const STALE_DURATION: Duration = Duration::from_secs(86400 * 365); //one year
#[instrument(level = "debug", name = "node_scraper", skip_all)]
#[instrument(level = "info", name = "metrics_scraper", skip_all)]
pub(crate) async fn spawn_in_background(db_pool: DbPool, nym_api_client_timeout: Duration) {
let network_defaults = nym_network_defaults::NymNetworkDetails::new_from_env();
loop {
//No graceful shutdown?
tracing::info!("Refreshing node self-described metrics...");
if let Err(e) = run(&db_pool, &network_defaults, nym_api_client_timeout).await {
@@ -123,7 +122,7 @@ impl MetricsScrapingData {
}
}
#[instrument(level = "debug", name = "metrics_scraper", skip_all)]
#[instrument(level = "info", name = "metrics_scraper", skip_all)]
async fn try_scrape_metrics(&self) -> Option<SessionStats> {
match self.try_get_client().await {
Ok(client) => {
@@ -0,0 +1,104 @@
use cosmwasm_std::Decimal;
use itertools::Itertools;
use rand::prelude::SliceRandom;
use rand::SeedableRng;
// pub(crate) fn generate_node_name(identity: ed25519::PublicKey) -> String {
pub(crate) fn generate_node_name(node_id: i64) -> String {
let seed = {
let node_id_bytes = node_id.to_le_bytes();
let mut seed = [0u8; 32];
for i in 0..4 {
seed[i * 8..(i + 1) * 8].copy_from_slice(&node_id_bytes);
}
seed
};
let mut rng = rand_chacha::ChaCha20Rng::from_seed(seed);
let words = bip39::Language::English.word_list();
words.choose_multiple(&mut rng, 3).join(" ")
}
#[allow(clippy::items_after_test_module)]
#[cfg(test)]
mod test {
use rand::Rng;
use super::*;
#[test]
fn generate_node_name_should_be_deterministic() {
let mut rng = rand::thread_rng();
let node_id: i64 = rng.gen();
let different_node_id: i64 = rng.gen();
let node_name = generate_node_name(node_id);
let node_name_different = generate_node_name(different_node_id);
assert_ne!(node_name, node_name_different);
let node_name_same = generate_node_name(node_id);
assert_eq!(node_name, node_name_same);
}
}
pub trait NumericalCheckedCast<T>
where
T: TryFrom<Self>,
<T as TryFrom<Self>>::Error: std::error::Error,
Self: std::fmt::Display + Copy,
{
fn cast_checked(self) -> anyhow::Result<T> {
T::try_from(self).map_err(|e| {
anyhow::anyhow!(
"Couldn't cast {} to {}: {}",
self,
std::any::type_name::<T>(),
e
)
})
}
}
impl<T, U> NumericalCheckedCast<U> for T
where
U: TryFrom<T>,
<U as TryFrom<T>>::Error: std::error::Error,
T: std::fmt::Display + Copy,
{
}
pub(crate) fn decimal_to_i64(decimal: Decimal) -> i64 {
// Convert the underlying Uint128 to a u128
let atomics = decimal.atomics().u128();
let precision = 1_000_000_000_000_000_000u128;
// Get the fractional part
let fractional = atomics % precision;
// Get the integer part
let integer = atomics / precision;
// Combine them into a float
let float_value = integer as f64 + (fractional as f64 / 1_000_000_000_000_000_000_f64);
// Limit to 6 decimal places
let rounded_value = (float_value * 1_000_000.0).round() / 1_000_000.0;
rounded_value as i64
}
pub(crate) trait LogError<T, E> {
fn log_error(self, msg: &str) -> Result<T, E>;
}
impl<T, E> LogError<T, E> for anyhow::Result<T, E>
where
E: std::error::Error,
{
fn log_error(self, msg: &str) -> Result<T, E> {
if let Err(e) = &self {
tracing::error!("[{msg}]:\t{e}");
}
self
}
}
+31 -2
View File
@@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use dashmap::DashMap;
use std::fmt::Display;
use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
use time::OffsetDateTime;
@@ -52,7 +53,7 @@ impl MixingStats {
self.ingress.senders.entry(source).or_default().malformed += 1;
}
pub fn ingress_received_forward_packet(&self, source: IpAddr) {
pub fn ingress_received_forward_packet(&self, source: IpAddr, version: PacketKind) {
self.ingress
.forward_hop_packets_received
.fetch_add(1, Ordering::Relaxed);
@@ -62,9 +63,10 @@ impl MixingStats {
.or_default()
.forward_packets
.received += 1;
*self.ingress.received_versions.entry(version).or_default() += 1;
}
pub fn ingress_received_final_hop_packet(&self, source: IpAddr) {
pub fn ingress_received_final_hop_packet(&self, source: IpAddr, version: PacketKind) {
self.ingress
.final_hop_packets_received
.fetch_add(1, Ordering::Relaxed);
@@ -74,6 +76,7 @@ impl MixingStats {
.or_default()
.final_hop_packets
.received += 1;
*self.ingress.received_versions.entry(version).or_default() += 1;
}
pub fn ingress_excessive_delay_packet(&self) {
@@ -196,8 +199,30 @@ pub struct IngressRecipientStats {
pub malformed: usize,
}
#[derive(Debug, Default, Copy, Clone, Hash, PartialEq, Eq)]
pub enum PacketKind {
#[default]
Unknown,
Outfox,
Sphinx(u16),
}
impl Display for PacketKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
match self {
PacketKind::Unknown => "unknown".fmt(f),
PacketKind::Outfox => "outfox".fmt(f),
PacketKind::Sphinx(sphinx_version) => {
write!(f, "sphinx-{sphinx_version}")
}
}
}
}
#[derive(Default)]
pub struct IngressMixingStats {
received_versions: DashMap<PacketKind, i64>,
// forward hop packets (i.e. to mixnode)
forward_hop_packets_received: AtomicUsize,
@@ -248,6 +273,10 @@ impl IngressMixingStats {
&self.senders
}
pub fn packet_versions(&self) -> &DashMap<PacketKind, i64> {
&self.received_versions
}
pub fn remove_stale_sender(&self, sender: IpAddr) {
self.senders.remove(&sender);
}
@@ -1,6 +1,7 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::mixnet::PacketKind;
use nym_metrics::{metrics_registry, HistogramTimer, Metric};
use std::sync::LazyLock;
use strum::{Display, EnumCount, EnumIter, EnumProperty, IntoEnumIterator};
@@ -28,6 +29,10 @@ const CLIENT_SESSION_DURATION_BUCKETS: &[f64] = &[
pub enum PrometheusMetric {
// # MIXNET
// ## INGRESS
#[strum(to_string = "mixnet_ingress_packet_version_{kind}")]
#[strum(props(help = "The number of ingress packets received with the particular version"))]
MixnetIngressPacketVersion { kind: PacketKind },
#[strum(props(help = "The number of ingress forward hop sphinx packets received"))]
MixnetIngressForwardPacketsReceived,
@@ -178,7 +183,11 @@ impl PrometheusMetric {
}
fn is_complex(&self) -> bool {
matches!(self, PrometheusMetric::EntryClientSessionsDurations { .. })
matches!(
self,
PrometheusMetric::EntryClientSessionsDurations { .. }
| PrometheusMetric::MixnetIngressPacketVersion { .. }
)
// match self {
// PrometheusMetric::EntryClientSessionsDurations { .. } => true,
// _ => false,
@@ -190,6 +199,9 @@ impl PrometheusMetric {
let help = self.help();
match self {
PrometheusMetric::MixnetIngressPacketVersion { .. } => {
Metric::new_int_gauge(&name, help)
}
PrometheusMetric::MixnetIngressForwardPacketsReceived => {
Metric::new_int_gauge(&name, help)
}
@@ -364,7 +376,7 @@ mod tests {
// a sanity check for anyone adding new metrics. if this test fails,
// make sure any methods on `PrometheusMetric` enum don't need updating
// or require custom Display impl
assert_eq!(37, PrometheusMetric::COUNT)
assert_eq!(38, PrometheusMetric::COUNT)
}
#[test]
@@ -385,6 +397,24 @@ mod tests {
assert_eq!(
"nym_node_entry_client_sessions_durations_vpn",
parameterised
)
);
let parameterised = PrometheusMetric::MixnetIngressPacketVersion {
kind: PacketKind::Outfox,
}
.to_string();
assert_eq!(
"nym_node_mixnet_ingress_packet_version_outfox",
parameterised
);
let parameterised = PrometheusMetric::MixnetIngressPacketVersion {
kind: PacketKind::Sphinx(42),
}
.to_string();
assert_eq!(
"nym_node_mixnet_ingress_packet_version_sphinx-42",
parameterised
);
}
}
@@ -43,6 +43,15 @@ impl OnUpdateMetricsHandler for PrometheusGlobalNodeMetricsRegistryUpdater {
// # MIXNET
// ## INGRESS
for version_entry in self.metrics.mixnet.ingress.packet_versions() {
self.prometheus_wrapper.set(
MixnetIngressPacketVersion {
kind: *version_entry.key(),
},
*version_entry.value(),
)
}
self.prometheus_wrapper.set(
MixnetIngressForwardPacketsReceived,
self.metrics.mixnet.ingress.forward_hop_packets_received() as i64,
+12 -9
View File
@@ -8,7 +8,7 @@ use nym_sphinx_forwarding::packet::MixPacket;
use nym_sphinx_framing::codec::NymCodec;
use nym_sphinx_framing::packet::FramedNymPacket;
use nym_sphinx_framing::processing::{
process_framed_packet, MixProcessingResult, ProcessedFinalHop,
process_framed_packet, MixProcessingResultData, ProcessedFinalHop,
};
use nym_sphinx_types::Delay;
use std::net::SocketAddr;
@@ -44,7 +44,7 @@ impl ConnectionHandler {
ConnectionHandler {
shared: SharedData {
processing_config: shared.processing_config,
sphinx_key: shared.sphinx_key.clone(),
sphinx_keys: shared.sphinx_keys.clone(),
mixnet_forwarder: shared.mixnet_forwarder.clone(),
final_hop: shared.final_hop.clone(),
metrics: shared.metrics.clone(),
@@ -135,7 +135,8 @@ impl ConnectionHandler {
nanos!("handle_received_nym_packet", {
// 1. attempt to unwrap the packet
let unwrapped_packet = process_framed_packet(packet, &self.shared.sphinx_key);
let unwrapped_packet =
process_framed_packet(packet, self.shared.sphinx_keys.private_key().as_ref());
// 2. increment our favourite metrics stats
self.shared
@@ -144,12 +145,14 @@ impl ConnectionHandler {
// 3. forward the packet to the relevant sink (if enabled)
match unwrapped_packet {
Err(err) => trace!("failed to process received mix packet: {err}"),
Ok(MixProcessingResult::ForwardHop(forward_packet, delay)) => {
self.handle_forward_packet(forward_packet, delay);
}
Ok(MixProcessingResult::FinalHop(final_hop_data)) => {
self.handle_final_hop(final_hop_data).await;
}
Ok(processed_packet) => match processed_packet.processing_data {
MixProcessingResultData::ForwardHop { packet, delay } => {
self.handle_forward_packet(packet, delay);
}
MixProcessingResultData::FinalHop { final_hop_data } => {
self.handle_final_hop(final_hop_data).await;
}
},
}
})
}
+28 -11
View File
@@ -7,9 +7,12 @@ use crate::node::mixnet::SharedFinalHopData;
use nym_crypto::asymmetric::x25519;
use nym_gateway::node::GatewayStorageError;
use nym_mixnet_client::forwarder::{MixForwardingSender, PacketToForward};
use nym_node_metrics::mixnet::PacketKind;
use nym_node_metrics::NymNodeMetrics;
use nym_sphinx_forwarding::packet::MixPacket;
use nym_sphinx_framing::processing::{MixProcessingResult, PacketProcessingError};
use nym_sphinx_framing::processing::{
MixPacketVersion, MixProcessingResult, MixProcessingResultData, PacketProcessingError,
};
use nym_sphinx_types::DestinationAddressBytes;
use nym_task::ShutdownToken;
use std::io;
@@ -45,8 +48,7 @@ impl ProcessingConfig {
// explicitly do NOT derive clone as we want to manually apply relevant suffixes to the task clients
pub(crate) struct SharedData {
pub(super) processing_config: ProcessingConfig,
// TODO: this type is not `Zeroize` : (
pub(super) sphinx_key: Arc<nym_sphinx_types::PrivateKey>,
pub(super) sphinx_keys: Arc<x25519::KeyPair>,
// used for FORWARD mix packets and FINAL ack packets
pub(super) mixnet_forwarder: MixForwardingSender,
@@ -58,10 +60,17 @@ pub(crate) struct SharedData {
pub(super) shutdown: ShutdownToken,
}
fn convert_to_metrics_version(processed: MixPacketVersion) -> PacketKind {
match processed {
MixPacketVersion::Outfox => PacketKind::Outfox,
MixPacketVersion::Sphinx(sphinx_version) => PacketKind::Sphinx(sphinx_version.value()),
}
}
impl SharedData {
pub(crate) fn new(
processing_config: ProcessingConfig,
x25519_key: &x25519::PrivateKey,
x25519_keys: Arc<x25519::KeyPair>,
mixnet_forwarder: MixForwardingSender,
final_hop: SharedFinalHopData,
metrics: NymNodeMetrics,
@@ -69,7 +78,7 @@ impl SharedData {
) -> Self {
SharedData {
processing_config,
sphinx_key: Arc::new(x25519_key.into()),
sphinx_keys: x25519_keys,
mixnet_forwarder,
final_hop,
metrics,
@@ -99,10 +108,18 @@ impl SharedData {
processing_result: &Result<MixProcessingResult, PacketProcessingError>,
source: IpAddr,
) {
match processing_result {
Err(_) => self.metrics.mixnet.ingress_malformed_packet(source),
Ok(MixProcessingResult::ForwardHop(_, delay)) => {
self.metrics.mixnet.ingress_received_forward_packet(source);
let Ok(processing_result) = processing_result else {
self.metrics.mixnet.ingress_malformed_packet(source);
return;
};
let packet_version = convert_to_metrics_version(processing_result.packet_version);
match processing_result.processing_data {
MixProcessingResultData::ForwardHop { delay, .. } => {
self.metrics
.mixnet
.ingress_received_forward_packet(source, packet_version);
// check if the delay wasn't excessive
if let Some(delay) = delay {
@@ -111,10 +128,10 @@ impl SharedData {
}
}
}
Ok(MixProcessingResult::FinalHop(_)) => {
MixProcessingResultData::FinalHop { .. } => {
self.metrics
.mixnet
.ingress_received_final_hop_packet(source);
.ingress_received_final_hop_packet(source, packet_version);
}
}
}
+1 -1
View File
@@ -993,7 +993,7 @@ impl NymNode {
let shared = mixnet::SharedData::new(
processing_config,
self.x25519_sphinx_keys.private_key(),
self.x25519_sphinx_keys.clone(),
mix_packet_sender.clone(),
final_hop_data,
self.metrics.clone(),
+1 -1
View File
@@ -14,7 +14,7 @@ rayon = { workspace = true }
blake3 = { workspace = true }
zeroize = { workspace = true }
chacha20 = { workspace = true, features = ["std"] }
curve25519-dalek = { workspace = true }
x25519-dalek = { workspace = true }
chacha20poly1305 = { workspace = true }
getrandom = { workspace = true, features = ["js"] }
thiserror = { workspace = true }
+19 -29
View File
@@ -54,17 +54,6 @@
//! routing data for the layer, and the remaining Header; separately the master key is used to lion encrypt
//! the payload. The process is repeated for each layer (from last to first) to construct the full message.
use chacha20poly1305::AeadInPlace;
use chacha20poly1305::ChaCha20Poly1305;
use chacha20poly1305::KeyInit;
use chacha20poly1305::Tag;
use curve25519_dalek::constants::ED25519_BASEPOINT_TABLE;
use curve25519_dalek::montgomery::MontgomeryPoint;
use curve25519_dalek::scalar::Scalar;
use std::ops::Range;
use crate::constants::groupelementbytes;
use crate::constants::tagbytes;
use crate::constants::DEFAULT_HOPS;
@@ -75,6 +64,11 @@ use crate::constants::ROUTING_INFORMATION_LENGTH_BY_STAGE;
use crate::constants::TAGBYTES;
use crate::error::OutfoxError;
use crate::lion::*;
use chacha20poly1305::AeadInPlace;
use chacha20poly1305::ChaCha20Poly1305;
use chacha20poly1305::KeyInit;
use chacha20poly1305::Tag;
use std::ops::Range;
/// A structure that holds mix packet construction parameters. These incluse the length
/// of the routing information at each hop, the number of hops, and the payload length.
@@ -218,13 +212,11 @@ impl MixStageParameters {
pub fn encode_mix_layer(
&self,
buffer: &mut [u8],
user_secret_key: &[u8],
node_pub_key: &[u8],
user_secret_key: &x25519_dalek::StaticSecret,
mix_public_key: x25519_dalek::PublicKey,
destination: &[u8; 32],
) -> Result<MontgomeryPoint, OutfoxError> {
) -> Result<x25519_dalek::SharedSecret, OutfoxError> {
let routing_data = destination;
let mix_public_key = MontgomeryPoint(node_pub_key.try_into()?);
let user_secret_key = Scalar::from_bytes_mod_order(user_secret_key.try_into()?);
if buffer.len() != self.incoming_packet_length() {
return Err(OutfoxError::LenMismatch {
@@ -240,14 +232,14 @@ impl MixStageParameters {
});
}
let user_public_key = (ED25519_BASEPOINT_TABLE * &user_secret_key).to_montgomery();
let shared_key = user_secret_key * mix_public_key;
let user_public_key = x25519_dalek::PublicKey::from(user_secret_key);
let shared_key = user_secret_key.diffie_hellman(&mix_public_key);
// Copy rounting data into buffer
buffer[self.routing_data_range()].copy_from_slice(routing_data);
// Perform the AEAD
let header_aead_key = ChaCha20Poly1305::new_from_slice(&shared_key.0[..])?;
let header_aead_key = ChaCha20Poly1305::new_from_slice(shared_key.as_bytes())?;
let nonce = [0u8; 12];
let tag = header_aead_key
@@ -258,10 +250,10 @@ impl MixStageParameters {
buffer[self.tag_range()].copy_from_slice(&tag[..]);
// Copy own public key into buffer
buffer[self.pub_element_range()].copy_from_slice(&user_public_key.0[..]);
buffer[self.pub_element_range()].copy_from_slice(user_public_key.as_bytes());
// Do a round of LION on the payload
lion_transform_encrypt(&mut buffer[self.payload_range()], &shared_key.0)?;
lion_transform_encrypt(&mut buffer[self.payload_range()], shared_key.as_bytes())?;
Ok(shared_key)
}
@@ -269,12 +261,9 @@ impl MixStageParameters {
pub fn decode_mix_layer(
&self,
buffer: &mut [u8],
mix_secret_key: &[u8],
mix_secret_key: &x25519_dalek::StaticSecret,
) -> Result<Vec<u8>, OutfoxError> {
// Check the length of the incoming buffer is correct.
let mix_secret_key = Scalar::from_bytes_mod_order(mix_secret_key.try_into()?);
if buffer.len() != self.incoming_packet_length() {
return Err(OutfoxError::LenMismatch {
expected: buffer.len(),
@@ -283,11 +272,12 @@ impl MixStageParameters {
}
// Derive the shared key for this packet
let user_public_key = MontgomeryPoint(buffer[self.pub_element_range()].try_into()?);
let shared_key = mix_secret_key * user_public_key;
let user_public_key_bytes: [u8; 32] = buffer[self.pub_element_range()].try_into()?;
let user_public_key = x25519_dalek::PublicKey::from(user_public_key_bytes);
let shared_key = mix_secret_key.diffie_hellman(&user_public_key);
// Compute the AEAD and check the Tag, if wrong return Err
let header_aead_key = ChaCha20Poly1305::new_from_slice(&shared_key.0[..])?;
let header_aead_key = ChaCha20Poly1305::new_from_slice(shared_key.as_bytes())?;
let nonce = [0; 12];
let tag_bytes = buffer[self.tag_range()].to_vec();
@@ -304,7 +294,7 @@ impl MixStageParameters {
let routing_data = buffer[self.routing_data_range()].to_vec();
// Do a round of LION on the payload
lion_transform_decrypt(&mut buffer[self.payload_range()], &shared_key.0)?;
lion_transform_decrypt(&mut buffer[self.payload_range()], shared_key.as_bytes())?;
Ok(routing_data)
}
+7 -12
View File
@@ -1,17 +1,14 @@
use std::{array::TryFromSliceError, collections::VecDeque, ops::Range};
use crate::{
constants::{DEFAULT_HOPS, MAGIC_SLICE, MIN_PACKET_SIZE, MIX_PARAMS_LEN},
error::OutfoxError,
format::{MixCreationParameters, MixStageParameters},
};
use rand::{rngs::OsRng, RngCore};
use sphinx_packet::{
crypto::PrivateKey,
packet::builder::DEFAULT_PAYLOAD_SIZE,
route::{Destination, Node},
};
use std::{array::TryFromSliceError, collections::VecDeque, ops::Range};
#[derive(Debug)]
pub struct OutfoxPacket {
@@ -90,8 +87,7 @@ impl OutfoxPacket {
destination: &Destination,
packet_size: Option<usize>,
) -> Result<OutfoxPacket, OutfoxError> {
let mut secret_key = [0; 32];
OsRng.fill_bytes(&mut secret_key);
let secret_key = x25519_dalek::StaticSecret::random();
let packet_size = packet_size.unwrap_or(DEFAULT_PAYLOAD_SIZE);
let packet_size = if packet_size < MIN_PACKET_SIZE {
MIN_PACKET_SIZE
@@ -110,7 +106,7 @@ impl OutfoxPacket {
stage_params.encode_mix_layer(
&mut buffer[range],
&secret_key,
route.last().unwrap().pub_key.as_bytes(),
route.last().unwrap().pub_key,
destination.address.as_bytes_ref(),
)?;
@@ -130,11 +126,11 @@ impl OutfoxPacket {
// We know that we'll always get 4 nodes, so we can unwrap here
let processing_node = nodes.last().unwrap();
let destination_node = nodes.first().unwrap();
OsRng.fill_bytes(&mut secret_key);
let secret_key = x25519_dalek::StaticSecret::random();
stage_params.encode_mix_layer(
&mut buffer[range],
&secret_key,
processing_node.pub_key.as_bytes(),
processing_node.pub_key,
destination_node.address.as_bytes_ref(),
)?;
}
@@ -168,7 +164,7 @@ impl OutfoxPacket {
pub fn decode_mix_layer(
&mut self,
layer: usize,
mix_secret_key: &[u8; 32],
mix_secret_key: &x25519_dalek::StaticSecret,
) -> Result<Vec<u8>, OutfoxError> {
let (range, params) = self.stage_params(layer);
let routing_data =
@@ -198,7 +194,6 @@ impl OutfoxPacket {
&mut self,
mix_secret_key: &PrivateKey,
) -> Result<[u8; 32], OutfoxError> {
let mix_secret_key = mix_secret_key.to_bytes();
let routing_lenght_by_stage = self
.mix_params()
.routing_information_length_by_stage
@@ -210,7 +205,7 @@ impl OutfoxPacket {
break;
}
}
self.decode_mix_layer(layer, &mix_secret_key)?;
self.decode_mix_layer(layer, mix_secret_key)?;
self.update_routing_information(layer)?;
let (range, stage_params) = self.mix_params().get_stage_params(layer);
let routing_bytes = &self.payload()[range][stage_params.routing_data_range()];
+19 -17
View File
@@ -9,11 +9,9 @@ mod tests {
repeat_with(|| fastrand::u8(..)).take(n).collect()
}
use curve25519_dalek::constants::ED25519_BASEPOINT_TABLE;
use curve25519_dalek::scalar::Scalar;
use nym_outfox::packet::OutfoxPacket;
use sphinx_packet::constants::NODE_ADDRESS_LENGTH;
use sphinx_packet::crypto::PublicKey;
use sphinx_packet::crypto::{PrivateKey, PublicKey};
use sphinx_packet::route::Destination;
use sphinx_packet::route::DestinationAddressBytes;
use sphinx_packet::route::Node;
@@ -22,6 +20,12 @@ mod tests {
use nym_outfox::format::*;
use nym_outfox::lion::*;
pub fn keygen() -> (PrivateKey, PublicKey) {
let private_key = PrivateKey::random();
let public_key = PublicKey::from(&private_key);
(private_key, public_key)
}
#[test]
fn test_encode_decode() {
let mix_params = MixStageParameters {
@@ -30,11 +34,9 @@ mod tests {
payload_length_bytes: 1024, // 1kb
};
let user_secret = randombytes(32);
let mix_secret = randombytes(32);
let mix_secret_scalar =
Scalar::from_bytes_mod_order(mix_secret.clone().try_into().unwrap());
let mix_public_key = (ED25519_BASEPOINT_TABLE * &mix_secret_scalar).to_montgomery();
let user_secret = x25519_dalek::StaticSecret::random();
let mix_secret = x25519_dalek::StaticSecret::random();
let mix_public_key = x25519_dalek::PublicKey::from(&mix_secret);
let routing = [0; 32];
let destination = [0; 32];
@@ -52,7 +54,7 @@ mod tests {
.encode_mix_layer(
&mut new_buffer[..],
&user_secret,
node.pub_key.as_bytes(),
node.pub_key,
&destination,
)
.unwrap();
@@ -93,23 +95,23 @@ mod tests {
#[test]
fn test_packet_params_short() {
let (node1_pk, node1_pub) = sphinx_packet::crypto::keygen();
let (node1_pk, node1_pub) = keygen();
let node1 = Node::new(
NodeAddressBytes::from_bytes([0u8; NODE_ADDRESS_LENGTH]),
node1_pub,
);
let (node2_pk, node2_pub) = sphinx_packet::crypto::keygen();
let (node2_pk, node2_pub) = keygen();
let node2 = Node::new(
NodeAddressBytes::from_bytes([1u8; NODE_ADDRESS_LENGTH]),
node2_pub,
);
let (node3_pk, node3_pub) = sphinx_packet::crypto::keygen();
let (node3_pk, node3_pub) = keygen();
let node3 = Node::new(
NodeAddressBytes::from_bytes([2u8; NODE_ADDRESS_LENGTH]),
node3_pub,
);
let (gateway_pk, gateway_pub) = sphinx_packet::crypto::keygen();
let (gateway_pk, gateway_pub) = keygen();
let gateway = Node::new(
NodeAddressBytes::from_bytes([3u8; NODE_ADDRESS_LENGTH]),
gateway_pub,
@@ -149,23 +151,23 @@ mod tests {
#[test]
fn test_packet_params_long() {
let (node1_pk, node1_pub) = sphinx_packet::crypto::keygen();
let (node1_pk, node1_pub) = keygen();
let node1 = Node::new(
NodeAddressBytes::from_bytes([0u8; NODE_ADDRESS_LENGTH]),
node1_pub,
);
let (node2_pk, node2_pub) = sphinx_packet::crypto::keygen();
let (node2_pk, node2_pub) = keygen();
let node2 = Node::new(
NodeAddressBytes::from_bytes([1u8; NODE_ADDRESS_LENGTH]),
node2_pub,
);
let (node3_pk, node3_pub) = sphinx_packet::crypto::keygen();
let (node3_pk, node3_pub) = keygen();
let node3 = Node::new(
NodeAddressBytes::from_bytes([2u8; NODE_ADDRESS_LENGTH]),
node3_pub,
);
let (gateway_pk, gateway_pub) = sphinx_packet::crypto::keygen();
let (gateway_pk, gateway_pub) = keygen();
let gateway = Node::new(
NodeAddressBytes::from_bytes([3u8; NODE_ADDRESS_LENGTH]),
gateway_pub,
+53 -183
View File
@@ -30,7 +30,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0"
dependencies = [
"crypto-common",
"generic-array 0.14.7",
"generic-array",
]
[[package]]
@@ -55,7 +55,7 @@ dependencies = [
"cipher",
"ctr",
"ghash",
"subtle 2.5.0",
"subtle",
]
[[package]]
@@ -154,16 +154,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2e554a8638bdc1e4eae9984845306cc95f8a9208ba8d49c3859fd958b46774d"
dependencies = [
"base64ct",
"blake2 0.10.6",
"blake2",
"cpufeatures",
"password-hash",
]
[[package]]
name = "arrayref"
version = "0.3.7"
name = "async-compression"
version = "0.4.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b4930d2cb77ce62f89ee5d5289b4ac049559b1c45539271f5ed4fdc7db34545"
checksum = "df895a515f70646414f4b45c0b79082783b80552b373a68283012928df56f522"
dependencies = [
"flate2",
"futures-core",
"memchr",
"pin-project-lite",
"tokio",
]
[[package]]
name = "async-trait"
@@ -306,7 +313,7 @@ dependencies = [
"ripemd",
"secp256k1",
"sha2 0.10.8",
"subtle 2.5.0",
"subtle",
"zeroize",
]
@@ -342,18 +349,6 @@ version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635"
[[package]]
name = "blake2"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94cb07b0da6a73955f8fb85d24c466778e70cda767a568229b104f0264089330"
dependencies = [
"byte-tools",
"crypto-mac",
"digest 0.8.1",
"opaque-debug 0.2.3",
]
[[package]]
name = "blake2"
version = "0.10.6"
@@ -375,7 +370,7 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4"
dependencies = [
"generic-array 0.14.7",
"generic-array",
]
[[package]]
@@ -384,7 +379,7 @@ version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71"
dependencies = [
"generic-array 0.14.7",
"generic-array",
]
[[package]]
@@ -399,7 +394,7 @@ dependencies = [
"rand_core 0.6.4",
"serde",
"serdect 0.3.0-pre.0",
"subtle 2.5.0",
"subtle",
"zeroize",
]
@@ -456,12 +451,6 @@ version = "3.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1"
[[package]]
name = "byte-tools"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7"
[[package]]
name = "bytemuck"
version = "1.13.1"
@@ -560,9 +549,9 @@ dependencies = [
[[package]]
name = "celes"
version = "2.4.0"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39b9a21273925d7cc9e8a9a5f068122341336813c607014f5ef64f82b6acba58"
checksum = "54441489dce7026efc8f01d1aa996c23fa39dd615a953d0e934433a42f61dd30"
dependencies = [
"serde",
]
@@ -608,16 +597,6 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chacha"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddf3c081b5fba1e5615640aae998e0fbd10c24cbd897ee39ed754a77601a4862"
dependencies = [
"byteorder",
"keystream",
]
[[package]]
name = "cipher"
version = "0.4.4"
@@ -724,13 +703,12 @@ dependencies = [
[[package]]
name = "colored"
version = "2.0.4"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2674ec482fbc38012cf31e6c42ba0177b431a0cb6f15fe40efa5aab1bda516f6"
checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c"
dependencies = [
"is-terminal",
"lazy_static",
"windows-sys 0.48.0",
"windows-sys 0.52.0",
]
[[package]]
@@ -973,9 +951,9 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf4c2f4e1afd912bc40bfd6fed5d9dc1f288e0ba01bfcc835cc5bc3eb13efe15"
dependencies = [
"generic-array 0.14.7",
"generic-array",
"rand_core 0.6.4",
"subtle 2.5.0",
"subtle",
"zeroize",
]
@@ -985,21 +963,11 @@ version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
dependencies = [
"generic-array 0.14.7",
"generic-array",
"rand_core 0.6.4",
"typenum",
]
[[package]]
name = "crypto-mac"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4434400df11d95d556bac068ddfedd482915eb18fe8bea89bc80b6e4b1c179e5"
dependencies = [
"generic-array 0.12.4",
"subtle 1.0.0",
]
[[package]]
name = "cssparser"
version = "0.27.2"
@@ -1055,7 +1023,7 @@ dependencies = [
"byteorder",
"digest 0.9.0",
"rand_core 0.5.1",
"subtle 2.5.0",
"subtle",
"zeroize",
]
@@ -1073,7 +1041,7 @@ dependencies = [
"platforms",
"rustc_version",
"serde",
"subtle 2.5.0",
"subtle",
"zeroize",
]
@@ -1283,22 +1251,13 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "digest"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5"
dependencies = [
"generic-array 0.12.4",
]
[[package]]
name = "digest"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066"
dependencies = [
"generic-array 0.14.7",
"generic-array",
]
[[package]]
@@ -1310,7 +1269,7 @@ dependencies = [
"block-buffer 0.10.4",
"const-oid",
"crypto-common",
"subtle 2.5.0",
"subtle",
]
[[package]]
@@ -1475,7 +1434,7 @@ dependencies = [
"rand_core 0.6.4",
"serde",
"sha2 0.10.8",
"subtle 2.5.0",
"subtle",
"zeroize",
]
@@ -1510,13 +1469,13 @@ dependencies = [
"crypto-bigint",
"digest 0.10.7",
"ff",
"generic-array 0.14.7",
"generic-array",
"group",
"pkcs8",
"rand_core 0.6.4",
"sec1",
"serdect 0.2.0",
"subtle 2.5.0",
"subtle",
"zeroize",
]
@@ -1629,7 +1588,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ded41244b729663b1e574f1b4fb731469f69f79c17667b5d776b16cda0479449"
dependencies = [
"rand_core 0.6.4",
"subtle 2.5.0",
"subtle",
]
[[package]]
@@ -1909,15 +1868,6 @@ dependencies = [
"windows 0.48.0",
]
[[package]]
name = "generic-array"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffdf9f34f1447443d37393cc6c2b8313aebddcd96906caf34e54c68d8e57d7bd"
dependencies = [
"typenum",
]
[[package]]
name = "generic-array"
version = "0.14.7"
@@ -1971,7 +1921,7 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d930750de5717d2dd0b8c0d42c076c0e884c81a73e6cab859bbd2339c71e3e40"
dependencies = [
"opaque-debug 0.3.0",
"opaque-debug",
"polyval",
]
@@ -2094,7 +2044,7 @@ checksum = "f0f9ef7462f7c099f518d754361858f86d8a07af53ba9af0fe635bbccb151a63"
dependencies = [
"ff",
"rand_core 0.6.4",
"subtle 2.5.0",
"subtle",
]
[[package]]
@@ -2322,15 +2272,6 @@ dependencies = [
"webpki-roots 0.25.4",
]
[[package]]
name = "hkdf"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b5f8eb2ad728638ea2c7d47a21db23b7b58a72ed6a38256b8a1849f15fbbdf7"
dependencies = [
"hmac",
]
[[package]]
name = "hmac"
version = "0.12.1"
@@ -2813,7 +2754,7 @@ version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5"
dependencies = [
"generic-array 0.14.7",
"generic-array",
]
[[package]]
@@ -2970,12 +2911,6 @@ dependencies = [
"signature",
]
[[package]]
name = "keystream"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c33070833c9ee02266356de0c43f723152bd38bd96ddf52c82b3af10c9138b28"
[[package]]
name = "kuchiki"
version = "0.8.1"
@@ -3000,12 +2935,6 @@ version = "0.2.169"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a"
[[package]]
name = "libm"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7012b1bbb0719e1097c47611d3898568c546d597c2e74d66f6087edd5233ff4"
[[package]]
name = "line-wrap"
version = "0.1.1"
@@ -3027,18 +2956,6 @@ version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57bcfdad1b858c2db7c38303a6d2ad4dfaf5eb53dfeb0910128b2c26d6158503"
[[package]]
name = "lioness"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ae926706ba42c425c9457121178330d75e273df2e82e28b758faf3de3a9acb9"
dependencies = [
"arrayref",
"blake2 0.8.1",
"chacha",
"keystream",
]
[[package]]
name = "litemap"
version = "0.7.4"
@@ -3309,7 +3226,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2"
dependencies = [
"autocfg",
"libm",
]
[[package]]
@@ -3424,7 +3340,7 @@ dependencies = [
"rand 0.8.5",
"serde",
"sha2 0.9.9",
"subtle 2.5.0",
"subtle",
"thiserror 2.0.11",
"zeroize",
]
@@ -3645,7 +3561,6 @@ dependencies = [
name = "nym-sphinx-types"
version = "0.2.0"
dependencies = [
"sphinx-packet",
"thiserror 2.0.11",
]
@@ -3655,7 +3570,7 @@ version = "0.1.0"
dependencies = [
"aes-gcm",
"argon2",
"generic-array 0.14.7",
"generic-array",
"getrandom 0.2.10",
"rand 0.8.5",
"serde",
@@ -3715,7 +3630,7 @@ dependencies = [
"base64 0.22.1",
"bip32",
"bip39",
"colored 2.0.4",
"colored 2.2.0",
"cosmrs 0.21.1",
"cosmwasm-std",
"cw-controllers",
@@ -3823,7 +3738,7 @@ dependencies = [
"base64 0.13.1",
"bip39",
"cfg-if",
"colored 2.0.4",
"colored 2.2.0",
"cosmrs 0.21.1",
"cosmwasm-std",
"dirs 4.0.0",
@@ -3921,12 +3836,6 @@ version = "1.20.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "945462a4b81e43c4e3ba96bd7b49d834c6f61198356aa858733bc4acf3cbe62e"
[[package]]
name = "opaque-debug"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c"
[[package]]
name = "opaque-debug"
version = "0.3.0"
@@ -4064,7 +3973,7 @@ checksum = "346f04948ba92c43e8469c1ee6736c7563d71012b17d40745260fe106aac2166"
dependencies = [
"base64ct",
"rand_core 0.6.4",
"subtle 2.5.0",
"subtle",
]
[[package]]
@@ -4354,7 +4263,7 @@ checksum = "d52cff9d1d4dee5fe6d03729099f4a310a41179e0a10dbf542039873f2e826fb"
dependencies = [
"cfg-if",
"cpufeatures",
"opaque-debug 0.3.0",
"opaque-debug",
"universal-hash",
]
@@ -4605,16 +4514,6 @@ dependencies = [
"getrandom 0.2.10",
]
[[package]]
name = "rand_distr"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32cb0b9bc82b0a0876c2dd994a7e7a2683d3e7390ca40e6886785ef0c7e3ee31"
dependencies = [
"num-traits",
"rand 0.8.5",
]
[[package]]
name = "rand_hc"
version = "0.2.0"
@@ -4758,6 +4657,7 @@ version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10"
dependencies = [
"async-compression",
"base64 0.22.1",
"bytes",
"encoding_rs",
@@ -4790,6 +4690,7 @@ dependencies = [
"tokio",
"tokio-native-tls",
"tokio-rustls 0.25.0",
"tokio-util",
"tower-service",
"url",
"wasm-bindgen",
@@ -4816,7 +4717,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8dd2a808d456c4a54e300a23e9f5a67e122c3024119acbfd73e3bf664491cb2"
dependencies = [
"hmac",
"subtle 2.5.0",
"subtle",
]
[[package]]
@@ -4925,7 +4826,7 @@ dependencies = [
"ring",
"rustls-pki-types",
"rustls-webpki 0.102.4",
"subtle 2.5.0",
"subtle",
"zeroize",
]
@@ -5078,10 +4979,10 @@ checksum = "d3e97a565f76233a6003f9f5c54be1d9c5bdfa3eccfb189469f11ec4901c47dc"
dependencies = [
"base16ct",
"der",
"generic-array 0.14.7",
"generic-array",
"pkcs8",
"serdect 0.2.0",
"subtle 2.5.0",
"subtle",
"zeroize",
]
@@ -5332,7 +5233,7 @@ dependencies = [
"cfg-if",
"cpufeatures",
"digest 0.9.0",
"opaque-debug 0.3.0",
"opaque-debug",
]
[[package]]
@@ -5449,31 +5350,6 @@ dependencies = [
"system-deps 5.0.0",
]
[[package]]
name = "sphinx-packet"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dabeca95bf5fd0563d6be7ebcb1c6a9fcb135746a0ba9050c47dc68c8607e595"
dependencies = [
"aes",
"arrayref",
"blake2 0.8.1",
"bs58",
"byteorder",
"chacha",
"ctr",
"curve25519-dalek 4.1.2",
"digest 0.10.7",
"hkdf",
"hmac",
"lioness",
"log",
"rand 0.8.5",
"rand_distr",
"sha2 0.10.8",
"subtle 2.5.0",
]
[[package]]
name = "spin"
version = "0.9.8"
@@ -5581,12 +5457,6 @@ dependencies = [
"syn 2.0.96",
]
[[package]]
name = "subtle"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d67a5a62ba6e01cb2192ff309324cb4875d0c451d55fe2319433abe7a05a8ee"
[[package]]
name = "subtle"
version = "2.5.0"
@@ -5977,7 +5847,7 @@ dependencies = [
"serde_repr",
"sha2 0.10.8",
"signature",
"subtle 2.5.0",
"subtle",
"subtle-encoding",
"tendermint-proto 0.34.0",
"time",
@@ -6007,7 +5877,7 @@ dependencies = [
"serde_repr",
"sha2 0.10.8",
"signature",
"subtle 2.5.0",
"subtle",
"subtle-encoding",
"tendermint-proto 0.40.1",
"time",
@@ -6080,7 +5950,7 @@ dependencies = [
"serde",
"serde_bytes",
"serde_json",
"subtle 2.5.0",
"subtle",
"subtle-encoding",
"tendermint 0.40.1",
"tendermint-config",
@@ -6544,7 +6414,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc1de2c688dc15305988b563c3854064043356019f97a4b46276fe734c4f07ea"
dependencies = [
"crypto-common",
"subtle 2.5.0",
"subtle",
]
[[package]]
+2
View File
@@ -44,6 +44,8 @@ tower-http = { workspace = true, features = ["cors", "trace"] }
utoipa = { workspace = true, features = ["axum_extras", "time"] }
utoipa-swagger-ui = { workspace = true, features = ["axum"] }
utoipauto = { workspace = true }
regex = "1.11.1"
lazy_static = "1.5.0"
[build-dependencies]
+6 -6
View File
@@ -7,12 +7,12 @@ Look in [env.rs](./src/env.rs) for the names of environment variables that can b
## Running locally
```
NYX_CHAIN_WATCHER_HISTORY_DATABASE_PATH=chain_history.sqlite \
NYX_CHAIN_WATCHER_DATABASE_PATH=nyx_chain_watcher.sqlite \
NYX_CHAIN_WATCHER_WATCH_ACCOUNTS=n1...,n1...,n1... \
NYX_CHAIN_WATCHER_WATCH_CHAIN_MESSAGE_TYPES="/cosmos.bank.v1beta1.MsgSend,/ibc.applications.transfer.v1.MsgTransfer"
NYX_CHAIN_WATCHER_WEBHOOK_URL="https://webhook.site" \
NYX_CHAIN_WATCHER_WEBHOOK_AUTH=1234 \
export NYX_CHAIN_WATCHER_HISTORY_DATABASE_PATH=chain_history.sqlite \
export NYX_CHAIN_WATCHER_DATABASE_PATH=nyx_chain_watcher.sqlite \
export NYX_CHAIN_WATCHER_WATCH_ACCOUNTS=n1...,n1...,n1... \
export NYX_CHAIN_WATCHER_WATCH_CHAIN_MESSAGE_TYPES="/cosmos.bank.v1beta1.MsgSend,/ibc.applications.transfer.v1.MsgTransfer"
export NYX_CHAIN_WATCHER_WEBHOOK_URL="https://webhook.site" \
export NYX_CHAIN_WATCHER_WEBHOOK_AUTH=1234 \
cargo run -- run
```
+2 -3
View File
@@ -1,13 +1,12 @@
use anyhow::Result;
use sqlx::{sqlite::SqliteConnectOptions, Connection, SqliteConnection};
use std::env::var;
use std::io::Write;
use std::{collections::HashMap, fs::File, path::PathBuf, str::FromStr};
#[tokio::main]
async fn main() -> Result<()> {
let db_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join(".build")
.join("nyx_chain_watcher.sqlite");
let db_path = PathBuf::from(var("OUT_DIR").unwrap()).join("nyx_chain_watcher.sqlite");
// Create the database directory if it doesn't exist
if let Some(parent) = db_path.parent() {
@@ -22,6 +22,15 @@ pub(crate) struct Args {
)]
pub watch_for_transfer_recipient_accounts: Option<Vec<AccountId>>,
/// (Override) Watch for transfers to these recipient accounts
#[clap(
long,
value_delimiter = ',',
env = NYX_RECORD_BEARER_VALUE
)]
pub record_bearer_token: Option<String>,
/// (Override) Watch for chain messages of these types
#[clap(
long,
@@ -14,6 +14,7 @@ pub(crate) fn get_run_config(args: Args) -> Result<Config, NyxChainWatcherError>
webhook_auth,
ref chain_watcher_db_path,
ref chain_history_db_path,
ref record_bearer_token,
webhook_url,
} = args;
@@ -54,6 +55,11 @@ pub(crate) fn get_run_config(args: Args) -> Result<Config, NyxChainWatcherError>
builder = builder.with_chain_scraper_db_path(db_path.clone());
}
if let Some(token) = record_bearer_token {
info!("Setting bearer token for authentication");
builder = builder.with_record_bearer_token(token.clone());
}
if let Some(webhook_url) = webhook_url {
let authentication =
webhook_auth.map(|token| HttpAuthenticationOptions::AuthorizationBearerToken { token });
+12
View File
@@ -49,6 +49,8 @@ pub struct ConfigBuilder {
pub payment_watcher_config: Option<PaymentWatcherConfig>,
pub logging: Option<LoggingSettings>,
pub bearer_token: Option<String>,
}
impl ConfigBuilder {
@@ -60,9 +62,15 @@ impl ConfigBuilder {
logging: None,
db_path: None,
chain_scraper_db_path: None,
bearer_token: None,
}
}
pub fn with_record_bearer_token(mut self, token: String) -> Self {
self.bearer_token = Some(token);
self
}
pub fn with_db_path(mut self, db_path: String) -> Self {
self.db_path = Some(db_path);
self
@@ -96,6 +104,7 @@ impl ConfigBuilder {
data_dir: self.data_dir,
db_path: self.db_path,
chain_scraper_db_path: self.chain_scraper_db_path,
bearer_token: self.bearer_token,
}
}
}
@@ -118,8 +127,11 @@ pub struct Config {
pub payment_watcher_config: Option<PaymentWatcherConfig>,
pub bearer_token: Option<String>,
#[serde(default)]
pub logging: LoggingSettings,
}
impl NymConfigTemplate for Config {
+3 -1
View File
@@ -1,4 +1,5 @@
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use utoipa::ToSchema;
#[derive(Clone, Deserialize, Debug, ToSchema)]
@@ -32,7 +33,7 @@ pub(crate) struct PriceHistory {
pub(crate) btc: f64,
}
#[derive(Serialize, Deserialize, Debug, ToSchema)]
#[derive(Serialize, Deserialize, Debug, ToSchema, FromRow)]
pub(crate) struct PaymentRecord {
pub(crate) transaction_hash: String,
pub(crate) sender_address: String,
@@ -41,3 +42,4 @@ pub(crate) struct PaymentRecord {
pub(crate) timestamp: i64,
pub(crate) height: i64,
}
+32 -1
View File
@@ -1,5 +1,7 @@
use crate::db::DbPool;
use anyhow::Result;
use anyhow::{Context, Result};
use regex::Regex;
use crate::db::models::PaymentRecord;
pub async fn get_last_checked_height(pool: &DbPool) -> Result<i64> {
let result = sqlx::query_scalar!("SELECT MAX(height) FROM payments")
@@ -8,6 +10,35 @@ pub async fn get_last_checked_height(pool: &DbPool) -> Result<i64> {
Ok(result.unwrap_or(0))
}
lazy_static::lazy_static! {
static ref HEX_PATTERN: Regex = Regex::new(r"^[A-Fa-f0-9]{64}$").unwrap();
static ref BASE64_PATTERN: Regex = Regex::new(r"^[A-Za-z0-9+/=]+$").unwrap();
}
pub async fn get_transaction_record(pool: &DbPool, record_txs: &str) -> Result<Option<PaymentRecord>> {
let query = if HEX_PATTERN.is_match(record_txs) {
"SELECT transaction_hash, sender_address, receiver_address, amount, timestamp, height
FROM transactions WHERE tx_hash = $1"
} else if BASE64_PATTERN.is_match(record_txs) {
"SELECT transaction_hash, sender_address, receiver_address, amount, timestamp, height
FROM transactions WHERE memo LIKE $1"
} else {
return Ok(None);
};
let param = if BASE64_PATTERN.is_match(record_txs) {
format!("%{}%", record_txs)
} else {
record_txs.to_string()
};
sqlx::query_as::<_, PaymentRecord>(query)
.bind(param)
.fetch_optional(pool)
.await
.context("Database query failed")
}
pub async fn insert_payment(
pool: &DbPool,
transaction_hash: String,
+1
View File
@@ -14,6 +14,7 @@ pub mod vars {
pub const NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT: &str =
"NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT";
pub const NYX_RECORD_BEARER_VALUE: &str = "NYX_RECORD_BEARER_VALUE";
pub const NYXD_SCRAPER_UNSAFE_NUKE_DB: &str = "NYXD_SCRAPER_UNSAFE_NUKE_DB";
pub const NYX_CHAIN_WATCHER_ID_ARG: &str = "NYX_CHAIN_WATCHER_ID";
+3 -1
View File
@@ -9,6 +9,7 @@ use crate::http::{api_docs, server::HttpServer, state::AppState};
pub(crate) mod price;
pub(crate) mod watcher;
pub(crate) mod records;
pub(crate) struct RouterBuilder {
unfinished_router: Router<AppState>,
@@ -26,7 +27,8 @@ impl RouterBuilder {
axum::routing::get(|| async { Redirect::permanent("/swagger") }),
)
.nest("/v1", Router::new().nest("/price", price::routes()))
.nest("/v1", Router::new().nest("/watcher", watcher::routes()));
.nest("/v1", Router::new().nest("/watcher", watcher::routes()))
.nest("/v1", Router::new().nest("/records", records::routes()));
Self {
unfinished_router: router,
+30
View File
@@ -0,0 +1,30 @@
use crate::db::models::PaymentRecord;
use crate::db::queries::payments::get_transaction_record;
use crate::http::error::{Error, HttpResult};
use crate::http::state::AppState;
use axum::{extract::{State, Path}, Json, Router};
pub(crate) fn routes() -> Router<AppState> {
Router::new().route("/records/:record_txs", axum::routing::get(transaction_record))
}
#[utoipa::path(
tag = "Watcher Records",
get,
path = "/v1/records/{record_txs}",
responses(
(status = 200, body = PaymentRecord),
(status = 404, description = "Transaction record not found")
)
)]
/// Fetch a transaction record from the database
async fn transaction_record(
State(state): State<AppState>,
Path(record_txs): Path<String>,
) -> HttpResult<Json<PaymentRecord>> {
get_transaction_record(state.db_pool(), &record_txs)
.await
.map_err(|_| Error::internal())?
.map(Json::from)
.ok_or_else(|| Error::not_found(&record_txs))
}
+8
View File
@@ -1,3 +1,4 @@
use tracing::error;
pub(crate) type HttpResult<T> = Result<T, Error>;
pub(crate) struct Error {
@@ -12,6 +13,13 @@ impl Error {
status: axum::http::StatusCode::INTERNAL_SERVER_ERROR,
}
}
pub(crate) fn not_found(resource: &str) -> Self {
error!("Resource not found: {}", resource);
Self {
message: format!("{} not found", resource),
status: axum::http::StatusCode::NOT_FOUND,
}
}
}
impl axum::response::IntoResponse for Error {
@@ -156,15 +156,3 @@ pub struct UnblindableShare {
pub issuer_key_bs58: String,
pub blinded_share_bs58: String,
}
#[wasm_bindgen]
impl UnblindableShare {
#[wasm_bindgen(constructor)]
pub fn new(issuer_index: u64, issuer_key_bs58: String, blinded_share_bs58: String) -> Self {
UnblindableShare {
issuer_index,
issuer_key_bs58,
blinded_share_bs58,
}
}
}