Compare commits

..

6 Commits

Author SHA1 Message Date
Jędrzej Stuczyński dce0d161ea replace logger 2024-10-15 17:11:21 +01:00
Jędrzej Stuczyński 853a62bc5b added debug symbols to release builds 2024-10-14 18:53:09 +01:00
Jędrzej Stuczyński 1927614803 added temporary and experimental memory profiling 2024-10-11 14:34:28 +01:00
Jędrzej Stuczyński 75a5192c6d Merge pull request #4958 from nymtech/bugfix/websocket-message-handling
bugfix: replace unreachable macro with an error return
2024-10-09 17:24:17 +01:00
Jędrzej Stuczyński 25ad0920cf bugfix: replace unreachable macro with an error return 2024-10-09 17:15:41 +01:00
Bogdan-Ștefan Neacşu a4c6f51fe0 Don't kill gateway on handle drop (#4934) 2024-09-27 11:02:39 +02:00
33 changed files with 350 additions and 1051 deletions
Generated
+70 -61
View File
@@ -1259,12 +1259,12 @@ dependencies = [
[[package]]
name = "cosmos-sdk-proto"
version = "0.26.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "462e1f6a8e005acc8835d32d60cbd7973ed65ea2a8d8473830e675f050956427"
version = "0.22.0-pre"
source = "git+https://github.com/cosmos/cosmos-rust?rev=4b1332e6d8258ac845cef71589c8d362a669675a#4b1332e6d8258ac845cef71589c8d362a669675a"
dependencies = [
"prost 0.13.5",
"tendermint-proto 0.40.1",
"prost 0.12.6",
"prost-types 0.12.6",
"tendermint-proto 0.37.0",
]
[[package]]
@@ -1289,12 +1289,11 @@ dependencies = [
[[package]]
name = "cosmrs"
version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1394c263335da09e8ba8c4b2c675d804e3e0deb44cce0866a5f838d3ddd43d02"
version = "0.17.0-pre"
source = "git+https://github.com/cosmos/cosmos-rust?rev=4b1332e6d8258ac845cef71589c8d362a669675a#4b1332e6d8258ac845cef71589c8d362a669675a"
dependencies = [
"bip32",
"cosmos-sdk-proto 0.26.1",
"cosmos-sdk-proto 0.22.0-pre",
"ecdsa",
"eyre",
"k256",
@@ -1303,7 +1302,7 @@ dependencies = [
"serde_json",
"signature",
"subtle-encoding",
"tendermint 0.40.1",
"tendermint 0.37.0",
"tendermint-rpc",
"thiserror",
]
@@ -4295,6 +4294,9 @@ dependencies = [
"tap",
"tempfile",
"thiserror",
"tikv-jemalloc-ctl",
"tikv-jemalloc-sys",
"tikv-jemallocator",
"time",
"tokio",
"tokio-stream",
@@ -4315,7 +4317,7 @@ name = "nym-api-requests"
version = "0.1.0"
dependencies = [
"bs58",
"cosmrs 0.21.1",
"cosmrs 0.17.0-pre",
"cosmwasm-std",
"ecdsa",
"getset",
@@ -4331,7 +4333,7 @@ dependencies = [
"serde",
"serde_json",
"sha2 0.10.8",
"tendermint 0.40.1",
"tendermint 0.37.0",
"thiserror",
"time",
"ts-rs",
@@ -4458,7 +4460,7 @@ name = "nym-bity-integration"
version = "0.1.0"
dependencies = [
"anyhow",
"cosmrs 0.21.1",
"cosmrs 0.17.0-pre",
"eyre",
"k256",
"nym-cli-commands",
@@ -4505,7 +4507,7 @@ dependencies = [
"clap 4.5.17",
"colored",
"comfy-table",
"cosmrs 0.21.1",
"cosmrs 0.17.0-pre",
"cosmwasm-std",
"csv",
"cw-utils",
@@ -4667,7 +4669,7 @@ name = "nym-client-core-gateways-storage"
version = "0.1.0"
dependencies = [
"async-trait",
"cosmrs 0.21.1",
"cosmrs 0.17.0-pre",
"log",
"nym-crypto",
"nym-gateway-requests",
@@ -4914,7 +4916,7 @@ version = "0.1.0"
dependencies = [
"bincode",
"bls12_381",
"cosmrs 0.21.1",
"cosmrs 0.17.0-pre",
"log",
"nym-api-requests",
"nym-credentials-interface",
@@ -5900,7 +5902,6 @@ version = "0.1.0"
dependencies = [
"base64 0.22.1",
"bs58",
"hex",
"serde",
"time",
]
@@ -6274,7 +6275,7 @@ name = "nym-types"
version = "1.0.0"
dependencies = [
"base64 0.22.1",
"cosmrs 0.21.1",
"cosmrs 0.17.0-pre",
"cosmwasm-std",
"eyre",
"hmac",
@@ -6307,7 +6308,7 @@ dependencies = [
"bip32",
"bip39",
"colored",
"cosmrs 0.21.1",
"cosmrs 0.17.0-pre",
"cosmwasm-std",
"cw-controllers",
"cw-utils",
@@ -6331,9 +6332,8 @@ dependencies = [
"nym-mixnet-contract-common",
"nym-multisig-contract-common",
"nym-network-defaults",
"nym-serde-helpers",
"nym-vesting-contract-common",
"prost 0.13.5",
"prost 0.12.6",
"reqwest 0.12.4",
"serde",
"serde_json",
@@ -6342,7 +6342,6 @@ dependencies = [
"thiserror",
"time",
"tokio",
"tracing",
"ts-rs",
"url",
"wasmtimer",
@@ -6495,14 +6494,14 @@ version = "0.1.0"
dependencies = [
"async-trait",
"const_format",
"cosmrs 0.21.1",
"cosmrs 0.17.0-pre",
"eyre",
"futures",
"humantime 2.1.0",
"serde",
"sha2 0.10.8",
"sqlx",
"tendermint 0.40.1",
"tendermint 0.37.0",
"tendermint-rpc",
"thiserror",
"time",
@@ -7172,16 +7171,6 @@ dependencies = [
"prost-derive 0.12.6",
]
[[package]]
name = "prost"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5"
dependencies = [
"bytes",
"prost-derive 0.13.5",
]
[[package]]
name = "prost-derive"
version = "0.11.9"
@@ -7208,19 +7197,6 @@ dependencies = [
"syn 2.0.66",
]
[[package]]
name = "prost-derive"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
dependencies = [
"anyhow",
"itertools 0.13.0",
"proc-macro2",
"quote",
"syn 2.0.66",
]
[[package]]
name = "prost-types"
version = "0.11.9"
@@ -8942,9 +8918,9 @@ dependencies = [
[[package]]
name = "tendermint"
version = "0.40.1"
version = "0.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9703e34d940c2a293804752555107f8dbe2b84ec4c6dd5203831235868105d2"
checksum = "954496fbc9716eb4446cdd6d00c071a3e2f22578d62aa03b40c7e5b4fda3ed42"
dependencies = [
"bytes",
"digest 0.10.7",
@@ -8955,7 +8931,8 @@ dependencies = [
"k256",
"num-traits",
"once_cell",
"prost 0.13.5",
"prost 0.12.6",
"prost-types 0.12.6",
"ripemd",
"serde",
"serde_bytes",
@@ -8965,21 +8942,21 @@ dependencies = [
"signature",
"subtle 2.5.0",
"subtle-encoding",
"tendermint-proto 0.40.1",
"tendermint-proto 0.37.0",
"time",
"zeroize",
]
[[package]]
name = "tendermint-config"
version = "0.40.1"
version = "0.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89cc3ea9a39b7ee34eefcff771cc067ecaa0c988c1c5ac08defd878471a06f76"
checksum = "f84b11b57d20ee4492a1452faff85f5c520adc36ca9fe5e701066935255bb89f"
dependencies = [
"flex-error",
"serde",
"serde_json",
"tendermint 0.40.1",
"tendermint 0.37.0",
"toml 0.8.14",
"url",
]
@@ -9004,13 +8981,14 @@ dependencies = [
[[package]]
name = "tendermint-proto"
version = "0.40.1"
version = "0.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ae9e1705aa0fa5ecb2c6aa7fb78c2313c4a31158ea5f02048bf318f849352eb"
checksum = "dc87024548c7f3da479885201e3da20ef29e85a3b13d04606b380ac4c7120d87"
dependencies = [
"bytes",
"flex-error",
"prost 0.13.5",
"prost 0.12.6",
"prost-types 0.12.6",
"serde",
"serde_bytes",
"subtle-encoding",
@@ -9019,9 +8997,9 @@ dependencies = [
[[package]]
name = "tendermint-rpc"
version = "0.40.1"
version = "0.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "835a52aa504c63ec05519e31348d3f4ba2fe79493c588e2cad5323d5e81b161a"
checksum = "dfdc2281e271277fda184d96d874a6fe59f569b130b634289257baacfc95aa85"
dependencies = [
"async-trait",
"async-tungstenite",
@@ -9039,9 +9017,9 @@ dependencies = [
"serde_json",
"subtle 2.5.0",
"subtle-encoding",
"tendermint 0.40.1",
"tendermint 0.37.0",
"tendermint-config",
"tendermint-proto 0.40.1",
"tendermint-proto 0.37.0",
"thiserror",
"time",
"tokio",
@@ -9157,6 +9135,37 @@ dependencies = [
"threadpool",
]
[[package]]
name = "tikv-jemalloc-ctl"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f21f216790c8df74ce3ab25b534e0718da5a1916719771d3fec23315c99e468b"
dependencies = [
"libc",
"paste",
"tikv-jemalloc-sys",
]
[[package]]
name = "tikv-jemalloc-sys"
version = "0.6.0+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd3c60906412afa9c2b5b5a48ca6a5abe5736aec9eb48ad05037a677e52e4e2d"
dependencies = [
"cc",
"libc",
]
[[package]]
name = "tikv-jemallocator"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4cec5ff18518d81584f477e9bfdf957f5bb0979b0bac3af4ca30b5b3ae2d2865"
dependencies = [
"libc",
"tikv-jemalloc-sys",
]
[[package]]
name = "time"
version = "0.3.36"
+8 -4
View File
@@ -5,6 +5,7 @@
panic = "abort"
opt-level = "s"
overflow-checks = true
debug = true
[profile.dev]
panic = "abort"
@@ -365,10 +366,13 @@ cw-controllers = { version = "=1.1.0" }
# cosmrs-related
bip32 = { version = "0.5.2", default-features = false }
cosmrs = { version = "0.21.1" }
tendermint = "0.40.0"
tendermint-rpc = "0.40.0"
prost = { version = "0.13", default-features = false }
# temporarily using a fork again (yay.) because we need staking and slashing support (which are already on main but not released)
# plus response message parsing (which is, as of the time of writing this message, waiting to get merged)
#cosmrs = { path = "../cosmos-rust-fork/cosmos-rust/cosmrs" }
cosmrs = { git = "https://github.com/cosmos/cosmos-rust", rev = "4b1332e6d8258ac845cef71589c8d362a669675a" } # unfortuntely we need a fork by yours truly to get the staking support
tendermint = "0.37.0" # same version as used by cosmrs
tendermint-rpc = "0.37.0" # same version as used by cosmrs
prost = { version = "0.12", default-features = false }
# wasm-related dependencies
gloo-utils = "0.2.0"
+1 -1
View File
@@ -10,4 +10,4 @@ pub use registration::{ClientMac, GatewayClient, InitMessage, Nonce};
#[cfg(feature = "verify")]
pub use registration::HmacSha256;
pub const VERSION: u8 = 1;
const VERSION: u8 = 1;
@@ -62,113 +62,8 @@ impl From<v1::registration::GatewayClient> for v2::registration::GatewayClient {
}
}
impl From<v2::registration::GatewayClient> for v1::registration::GatewayClient {
fn from(gw_client: v2::registration::GatewayClient) -> Self {
Self {
pub_key: gw_client.pub_key,
private_ip: gw_client.private_ip,
mac: gw_client.mac.into(),
}
}
}
impl From<v1::registration::ClientMac> for v2::registration::ClientMac {
fn from(mac: v1::registration::ClientMac) -> Self {
Self::new(mac.to_vec())
}
}
impl From<v2::registration::ClientMac> for v1::registration::ClientMac {
fn from(mac: v2::registration::ClientMac) -> Self {
Self::new(mac.to_vec())
}
}
impl From<v2::response::AuthenticatorResponse> for v1::response::AuthenticatorResponse {
fn from(authenticator_response: v2::response::AuthenticatorResponse) -> Self {
Self {
version: authenticator_response.protocol.version,
data: authenticator_response.data.into(),
reply_to: authenticator_response.reply_to,
}
}
}
impl From<v2::response::AuthenticatorResponseData> for v1::response::AuthenticatorResponseData {
fn from(authenticator_response_data: v2::response::AuthenticatorResponseData) -> Self {
match authenticator_response_data {
v2::response::AuthenticatorResponseData::PendingRegistration(
pending_registration_response,
) => v1::response::AuthenticatorResponseData::PendingRegistration(
pending_registration_response.into(),
),
v2::response::AuthenticatorResponseData::Registered(registered_response) => {
v1::response::AuthenticatorResponseData::Registered(registered_response.into())
}
v2::response::AuthenticatorResponseData::RemainingBandwidth(
remaining_bandwidth_response,
) => v1::response::AuthenticatorResponseData::RemainingBandwidth(
remaining_bandwidth_response.into(),
),
}
}
}
impl From<v2::response::PendingRegistrationResponse> for v1::response::PendingRegistrationResponse {
fn from(value: v2::response::PendingRegistrationResponse) -> Self {
Self {
request_id: value.request_id,
reply_to: value.reply_to,
reply: value.reply.into(),
}
}
}
impl From<v2::response::RegisteredResponse> for v1::response::RegisteredResponse {
fn from(value: v2::response::RegisteredResponse) -> Self {
Self {
request_id: value.request_id,
reply_to: value.reply_to,
reply: value.reply.into(),
}
}
}
impl From<v2::response::RemainingBandwidthResponse> for v1::response::RemainingBandwidthResponse {
fn from(value: v2::response::RemainingBandwidthResponse) -> Self {
Self {
request_id: value.request_id,
reply_to: value.reply_to,
reply: value.reply.map(Into::into),
}
}
}
impl From<v2::registration::RegistrationData> for v1::registration::RegistrationData {
fn from(value: v2::registration::RegistrationData) -> Self {
Self {
nonce: value.nonce,
gateway_data: value.gateway_data.into(),
wg_port: value.wg_port,
}
}
}
impl From<v2::registration::RegistredData> for v1::registration::RegistredData {
fn from(value: v2::registration::RegistredData) -> Self {
Self {
pub_key: value.pub_key,
private_ip: value.private_ip,
wg_port: value.wg_port,
}
}
}
impl From<v2::registration::RemainingBandwidthData> for v1::registration::RemainingBandwidthData {
fn from(value: v2::registration::RemainingBandwidthData) -> Self {
Self {
available_bandwidth: value.available_bandwidth as u64,
suspended: false,
}
}
}
+1 -1
View File
@@ -6,4 +6,4 @@ pub mod registration;
pub mod request;
pub mod response;
pub const VERSION: u8 = 2;
const VERSION: u8 = 2;
@@ -20,13 +20,11 @@ nym-coconut-bandwidth-contract-common = { path = "../../cosmwasm-smart-contracts
nym-ecash-contract-common = { path = "../../cosmwasm-smart-contracts/ecash-contract" }
nym-multisig-contract-common = { path = "../../cosmwasm-smart-contracts/multisig-contract" }
nym-group-contract-common = { path = "../../cosmwasm-smart-contracts/group-contract" }
nym-serde-helpers = { path = "../../serde-helpers", features = ["hex", "base64"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
nym-http-api-client = { path = "../../../common/http-api-client" }
thiserror = { workspace = true }
log = { workspace = true }
tracing = { workspace = true }
url = { workspace = true, features = ["serde"] }
tokio = { workspace = true, features = ["sync", "time"] }
time = { workspace = true, features = ["formatting"] }
@@ -5,7 +5,7 @@ use crate::nyxd;
use crate::nyxd::coin::Coin;
use crate::nyxd::cosmwasm_client::helpers::{create_pagination, next_page_key};
use crate::nyxd::cosmwasm_client::types::{
Account, CodeDetails, Contract, ContractCodeId, Model, SequenceResponse, SimulateResponse,
Account, CodeDetails, Contract, ContractCodeId, SequenceResponse, SimulateResponse,
};
use crate::nyxd::error::NyxdError;
use crate::nyxd::Query;
@@ -21,14 +21,15 @@ use cosmrs::proto::cosmos::tx::v1beta1::{
SimulateRequest, SimulateResponse as ProtoSimulateResponse,
};
use cosmrs::proto::cosmwasm::wasm::v1::{
QueryAllContractStateRequest, QueryAllContractStateResponse, QueryCodeRequest,
QueryCodeResponse, QueryCodesRequest, QueryCodesResponse, QueryContractHistoryRequest,
QueryContractHistoryResponse, QueryContractInfoRequest, QueryContractInfoResponse,
QueryContractsByCodeRequest, QueryContractsByCodeResponse, QueryRawContractStateRequest,
QueryRawContractStateResponse, QuerySmartContractStateRequest, QuerySmartContractStateResponse,
QueryCodeRequest, QueryCodeResponse, QueryCodesRequest, QueryCodesResponse,
QueryContractHistoryRequest, QueryContractHistoryResponse, QueryContractInfoRequest,
QueryContractInfoResponse, QueryContractsByCodeRequest, QueryContractsByCodeResponse,
QueryRawContractStateRequest, QueryRawContractStateResponse, QuerySmartContractStateRequest,
QuerySmartContractStateResponse,
};
use cosmrs::tendermint::{block, chain, Hash};
use cosmrs::{AccountId, Coin as CosmosCoin, Tx};
use log::trace;
use prost::Message;
use serde::{Deserialize, Serialize};
@@ -67,7 +68,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
Res: Message + Default,
{
if let Some(ref abci_path) = path {
tracing::trace!("performing query on abci path {abci_path}")
trace!("performing query on abci path {abci_path}")
}
let mut buf = Vec::with_capacity(req.encoded_len());
req.encode(&mut buf)?;
@@ -153,20 +154,13 @@ pub trait CosmWasmClient: TendermintRpcClient {
let req = QueryAllBalancesRequest {
address: address.to_string(),
pagination,
resolve_denom: false,
};
let mut res = self
.make_abci_query::<_, QueryAllBalancesResponse>(path.clone(), req)
.await?;
let early_break = res.balances.is_empty();
raw_balances.append(&mut res.balances);
if early_break {
break;
}
if let Some(next_key) = next_page_key(res.pagination) {
pagination = Some(create_pagination(next_key))
} else {
@@ -194,13 +188,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
.make_abci_query::<_, QueryTotalSupplyResponse>(path.clone(), req)
.await?;
let early_break = res.supply.is_empty();
supply.append(&mut res.supply);
if early_break {
break;
}
if let Some(next_key) = next_page_key(res.pagination) {
pagination = Some(create_pagination(next_key))
} else {
@@ -230,19 +218,17 @@ pub trait CosmWasmClient: TendermintRpcClient {
loop {
let mut res = self
.tx_search(query.clone(), false, page, per_page, Order::Ascending)
.tx_search(query.clone(), false, page, 100, Order::Ascending)
.await?;
results.append(&mut res.txs);
// sanity check for if tendermint's maximum per_page was modified -
// we don't want to accidentally be stuck in an infinite loop
let early_break = res.total_count == 0 || res.txs.is_empty();
results.append(&mut res.txs);
if early_break {
if res.total_count == 0 || res.txs.is_empty() {
break;
}
if res.total_count > results.len() as u32 {
if res.total_count >= per_page {
page += 1
} else {
break;
@@ -309,7 +295,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
let start = Instant::now();
loop {
tracing::debug!(
log::debug!(
"Polling for result of including {} in a block...",
broadcasted.hash
);
@@ -341,13 +327,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
.make_abci_query::<_, QueryCodesResponse>(path.clone(), req)
.await?;
let early_break = res.code_infos.is_empty();
raw_codes.append(&mut res.code_infos);
if early_break {
break;
}
if let Some(next_key) = next_page_key(res.pagination) {
pagination = Some(create_pagination(next_key))
} else {
@@ -392,13 +372,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
.make_abci_query::<_, QueryContractsByCodeResponse>(path.clone(), req)
.await?;
let early_break = res.contracts.is_empty();
raw_contracts.append(&mut res.contracts);
if early_break {
break;
}
if let Some(next_key) = next_page_key(res.pagination) {
pagination = Some(create_pagination(next_key))
} else {
@@ -454,13 +428,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
.make_abci_query::<_, QueryContractHistoryResponse>(path.clone(), req)
.await?;
let early_break = res.entries.is_empty();
raw_entries.append(&mut res.entries);
if early_break {
break;
}
if let Some(next_key) = next_page_key(res.pagination) {
pagination = Some(create_pagination(next_key))
} else {
@@ -474,38 +442,6 @@ pub trait CosmWasmClient: TendermintRpcClient {
.collect::<Result<_, _>>()?)
}
async fn query_all_contract_state(&self, address: &AccountId) -> Result<Vec<Model>, NyxdError> {
let path = Some("/cosmwasm.wasm.v1.Query/AllContractState".to_owned());
let mut models = Vec::new();
let mut pagination = None;
loop {
let req = QueryAllContractStateRequest {
address: address.to_string(),
pagination,
};
let mut res = self
.make_abci_query::<_, QueryAllContractStateResponse>(path.clone(), req)
.await?;
let empty_response = res.models.is_empty();
models.append(&mut res.models);
if empty_response {
break;
}
if let Some(next_key) = next_page_key(res.pagination) {
pagination = Some(create_pagination(next_key))
} else {
break;
}
}
Ok(models.into_iter().map(Into::into).collect())
}
async fn query_contract_raw(
&self,
address: &AccountId,
@@ -552,7 +488,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
.make_abci_query::<_, QuerySmartContractStateResponse>(path, req)
.await?;
tracing::trace!("raw query response: {}", String::from_utf8_lossy(&res.data));
trace!("raw query response: {}", String::from_utf8_lossy(&res.data));
Ok(serde_json::from_slice(&res.data)?)
}
@@ -27,34 +27,13 @@ use cosmrs::vesting::{
};
use cosmrs::{AccountId, Any, Coin as CosmosCoin};
use prost::Message;
use serde::{Deserialize, Serialize};
use serde::Serialize;
pub use cosmrs::abci::GasInfo;
pub use cosmrs::abci::MsgResponse;
pub type ContractCodeId = u64;
// yet another thing to put in cosmrs
#[derive(Serialize, Deserialize)]
pub struct Model {
#[serde(with = "nym_serde_helpers::hex")]
pub key: Vec<u8>,
#[serde(with = "nym_serde_helpers::base64")]
pub value: Vec<u8>,
}
// follow the cosmwasm serialisation format, i.e. hex for key and base64 for value
impl From<cosmrs::proto::cosmwasm::wasm::v1::Model> for Model {
fn from(model: cosmrs::proto::cosmwasm::wasm::v1::Model) -> Self {
Model {
key: model.key,
value: model.value,
}
}
}
#[derive(Serialize)]
pub struct EmptyMsg {}
@@ -4,11 +4,9 @@
use crate::rpc::TendermintRpcClient;
use async_trait::async_trait;
use base64::Engine;
use cosmrs::tendermint;
use cosmrs::tendermint::{block::Height, evidence::Evidence, Hash};
use reqwest::header::HeaderMap;
use reqwest::{header, RequestBuilder};
use tendermint_rpc::dialect::{v0_34, v0_37, v0_38, LatestDialect};
use tendermint_rpc::{
client::CompatMode,
dialect::{self, Dialect},
@@ -23,21 +21,8 @@ macro_rules! perform_with_compat {
($self:expr, $request:expr) => {{
let request = $request;
match $self.compat {
CompatMode::V0_38 => {
$self
.perform_request_with_dialect(request, dialect::v0_38::Dialect)
.await
}
CompatMode::V0_37 => {
$self
.perform_request_with_dialect(request, dialect::v0_37::Dialect)
.await
}
CompatMode::V0_34 => {
$self
.perform_request_with_dialect(request, dialect::v0_34::Dialect)
.await
}
CompatMode::V0_37 => $self.perform_v0_37(request).await,
CompatMode::V0_34 => $self.perform_v0_34(request).await,
}
}};
}
@@ -85,11 +70,7 @@ impl ReqwestRpcClient {
.headers(headers)
}
async fn perform_request_with_dialect<R, S>(
&self,
request: R,
_dialect: S,
) -> Result<R::Output, Error>
async fn perform_request<R, S>(&self, request: R) -> Result<R::Output, Error>
where
R: SimpleRequest<S>,
S: Dialect,
@@ -100,25 +81,26 @@ impl ReqwestRpcClient {
.send()
.await
.map_err(TendermintRpcErrorMap::into_rpc_err)?;
let response_status = response.status();
let bytes = response
.bytes()
.await
.map_err(TendermintRpcErrorMap::into_rpc_err)?;
// Successful JSON-RPC requests are expected to return a 200 OK HTTP status.
// Otherwise, this means that the HTTP request failed as a whole,
// as opposed to the JSON-RPC request returning an error,
// and we cannot expect the response body to be a valid JSON-RPC response.
if response_status != reqwest::StatusCode::OK {
// hehe, that's so nasty but we have to somehow convert between different versions of the same lib
return Err(Error::http_request_failed(
response_status.as_u16().try_into().unwrap(),
));
}
R::Response::from_string(bytes).map(Into::into)
}
async fn perform_v0_34<R>(&self, request: R) -> Result<R::Output, Error>
where
R: SimpleRequest<dialect::v0_34::Dialect>,
{
self.perform_request(request).await
}
async fn perform_v0_37<R>(&self, request: R) -> Result<R::Output, Error>
where
R: SimpleRequest<dialect::v0_37::Dialect>,
{
self.perform_request(request).await
}
}
trait TendermintRpcErrorMap {
@@ -138,50 +120,18 @@ impl TendermintRpcClient for ReqwestRpcClient {
where
R: SimpleRequest,
{
self.perform_request_with_dialect(request, LatestDialect)
.await
self.perform_request(request).await
}
async fn block<H>(&self, height: H) -> Result<endpoint::block::Response, Error>
async fn block_results<H>(&self, height: H) -> Result<block_results::Response, Error>
where
H: Into<Height> + Send,
{
perform_with_compat!(self, endpoint::block::Request::new(height.into()))
perform_with_compat!(self, block_results::Request::new(height.into()))
}
async fn block_by_hash(
&self,
hash: tendermint::Hash,
) -> Result<endpoint::block_by_hash::Response, Error> {
perform_with_compat!(self, endpoint::block_by_hash::Request::new(hash))
}
async fn latest_block(&self) -> Result<endpoint::block::Response, Error> {
perform_with_compat!(self, endpoint::block::Request::default())
}
async fn block_results<H>(&self, height: H) -> Result<endpoint::block_results::Response, Error>
where
H: Into<Height> + Send,
{
perform_with_compat!(self, endpoint::block_results::Request::new(height.into()))
}
async fn latest_block_results(&self) -> Result<endpoint::block_results::Response, Error> {
perform_with_compat!(self, endpoint::block_results::Request::default())
}
async fn block_search(
&self,
query: Query,
page: u32,
per_page: u8,
order: Order,
) -> Result<endpoint::block_search::Response, Error> {
perform_with_compat!(
self,
endpoint::block_search::Request::new(query, page, per_page, order)
)
async fn latest_block_results(&self) -> Result<block_results::Response, Error> {
perform_with_compat!(self, block_results::Request::default())
}
async fn header<H>(&self, height: H) -> Result<endpoint::header::Response, Error>
@@ -190,26 +140,11 @@ impl TendermintRpcClient for ReqwestRpcClient {
{
let height = height.into();
match self.compat {
CompatMode::V0_38 => {
self.perform_request_with_dialect(
endpoint::header::Request::new(height),
v0_38::Dialect,
)
.await
}
CompatMode::V0_37 => {
self.perform_request_with_dialect(
endpoint::header::Request::new(height),
v0_37::Dialect,
)
.await
}
CompatMode::V0_37 => self.perform(endpoint::header::Request::new(height)).await,
CompatMode::V0_34 => {
// Back-fill with a request to /block endpoint and
// taking just the header from the response.
let resp = self
.perform_request_with_dialect(block::Request::new(height), v0_34::Dialect)
.await?;
let resp = self.perform_v0_34(block::Request::new(height)).await?;
Ok(resp.into())
}
}
@@ -217,25 +152,12 @@ impl TendermintRpcClient for ReqwestRpcClient {
async fn header_by_hash(&self, hash: Hash) -> Result<header_by_hash::Response, Error> {
match self.compat {
CompatMode::V0_38 => {
self.perform_request_with_dialect(
header_by_hash::Request::new(hash),
v0_38::Dialect,
)
.await
}
CompatMode::V0_37 => {
self.perform_request_with_dialect(
header_by_hash::Request::new(hash),
v0_37::Dialect,
)
.await
}
CompatMode::V0_37 => self.perform(header_by_hash::Request::new(hash)).await,
CompatMode::V0_34 => {
// Back-fill with a request to /block_by_hash endpoint and
// taking just the header from the response.
let resp = self
.perform_request_with_dialect(block_by_hash::Request::new(hash), v0_34::Dialect)
.perform_v0_34(block_by_hash::Request::new(hash))
.await?;
Ok(resp.into())
}
@@ -245,18 +167,8 @@ impl TendermintRpcClient for ReqwestRpcClient {
/// `/broadcast_evidence`: broadcast an evidence.
async fn broadcast_evidence(&self, e: Evidence) -> Result<evidence::Response, Error> {
match self.compat {
CompatMode::V0_38 => {
self.perform_request_with_dialect(evidence::Request::new(e), v0_38::Dialect)
.await
}
CompatMode::V0_37 => {
self.perform_request_with_dialect(evidence::Request::new(e), v0_37::Dialect)
.await
}
CompatMode::V0_34 => {
self.perform_request_with_dialect(evidence::Request::new(e), v0_34::Dialect)
.await
}
CompatMode::V0_37 => self.perform(evidence::Request::new(e)).await,
CompatMode::V0_34 => self.perform_v0_34(evidence::Request::new(e)).await,
}
}
+1 -22
View File
@@ -10,7 +10,6 @@ use crate::rpc_client::RpcClient;
use crate::storage::{persist_block, ScraperStorage};
use crate::PruningOptions;
use futures::StreamExt;
use std::cmp::max;
use std::collections::{BTreeMap, HashSet, VecDeque};
use std::ops::{Add, Range};
use std::sync::Arc;
@@ -100,15 +99,7 @@ impl BlockProcessor {
})
}
pub fn with_pruning(mut self, pruning_options: PruningOptions) -> Self {
self.pruning_options = pruning_options;
self
}
pub(super) async fn process_block(
&mut self,
block: BlockToProcess,
) -> Result<(), ScraperError> {
async fn process_block(&mut self, block: BlockToProcess) -> Result<(), ScraperError> {
info!("processing block at height {}", block.height);
let full_info = self.rpc_client.try_get_full_details(block).await?;
@@ -178,10 +169,6 @@ impl BlockProcessor {
self.msg_modules = modules;
}
pub(super) fn last_process_height(&self) -> u32 {
self.last_processed_height
}
async fn maybe_request_missing_blocks(&mut self) -> Result<(), ScraperError> {
// we're still processing, so we're good
if self.last_processed_at.elapsed() < MAX_MISSING_BLOCKS_DELAY {
@@ -267,7 +254,6 @@ impl BlockProcessor {
}
if to_prune == 0 {
self.last_pruned_height = self.last_processed_height;
return Ok(());
}
@@ -367,14 +353,7 @@ impl BlockProcessor {
self.maybe_prune_storage().await?;
let latest_block = self.rpc_client.current_block_height().await? as u32;
if latest_block > self.last_processed_height && self.last_processed_height != 0 {
// in case we were offline for a while,
// make sure we don't request blocks we'd have to prune anyway
let keep_recent = self.pruning_options.strategy_keep_recent();
let last_to_keep = latest_block - keep_recent;
self.last_processed_height = max(self.last_processed_height, last_to_keep);
let request_range = self.last_processed_height + 1..latest_block + 1;
info!("we need to request {request_range:?} to resync");
self.request_missing_blocks(request_range).await?;
@@ -84,7 +84,13 @@ impl TryFrom<Event> for BlockToProcess {
// TODO: we're losing `result_begin_block` and `result_end_block` here but maybe that's fine?
let maybe_block = match event.data {
EventData::NewBlock { block, .. } => block,
// we don't care about `NewBlock` until CometBFT 0.38, i.e. until we upgrade to wasmd 0.50
EventData::NewBlock { .. } => {
return Err(ScraperError::InvalidSubscriptionEvent {
query,
kind: "NewBlock".to_string(),
})
}
EventData::LegacyNewBlock { block, .. } => block,
EventData::Tx { .. } => {
return Err(ScraperError::InvalidSubscriptionEvent {
+1 -1
View File
@@ -16,7 +16,7 @@ pub enum ScraperError {
#[error("failed to perform startup SQL migration: {0}")]
StartupMigrationFailure(#[from] sqlx::migrate::MigrateError),
#[error("the block scraper is already running")]
#[error("can't add any modules to the scraper as it's already running")]
ScraperAlreadyRunning,
#[error("failed to establish websocket connection to {url}: {source}")]
+16 -143
View File
@@ -1,25 +1,21 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::block_processor::types::BlockToProcess;
use crate::block_processor::BlockProcessor;
use crate::block_requester::{BlockRequest, BlockRequester};
use crate::block_requester::BlockRequester;
use crate::error::ScraperError;
use crate::modules::{BlockModule, MsgModule, TxModule};
use crate::rpc_client::RpcClient;
use crate::scraper::subscriber::ChainSubscriber;
use crate::storage::ScraperStorage;
use crate::PruningOptions;
use futures::future::join_all;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::mpsc::{
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender,
};
use tokio::sync::mpsc::{channel, unbounded_channel};
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use tracing::{error, info};
use tracing::info;
use url::Url;
mod subscriber;
@@ -119,7 +115,6 @@ pub struct NyxdScraper {
cancel_token: CancellationToken,
startup_sync: Arc<Notify>,
pub storage: ScraperStorage,
rpc_client: RpcClient,
}
impl NyxdScraper {
@@ -130,7 +125,6 @@ impl NyxdScraper {
pub async fn new(config: Config) -> Result<Self, ScraperError> {
config.pruning_options.validate()?;
let storage = ScraperStorage::init(&config.database_path).await?;
let rpc_client = RpcClient::new(&config.rpc_url)?;
Ok(NyxdScraper {
config,
@@ -138,7 +132,6 @@ impl NyxdScraper {
cancel_token: CancellationToken::new(),
startup_sync: Arc::new(Default::default()),
storage,
rpc_client,
})
}
@@ -158,156 +151,36 @@ impl NyxdScraper {
self.task_tracker.close();
}
pub async fn process_single_block(&self, height: u32) -> Result<(), ScraperError> {
info!(height = height, "attempting to process a single block");
if !self.task_tracker.is_empty() {
return Err(ScraperError::ScraperAlreadyRunning);
}
pub async fn start(&self) -> Result<(), ScraperError> {
let (processing_tx, processing_rx) = unbounded_channel();
let (req_tx, req_rx) = channel(5);
let (_, processing_rx) = unbounded_channel();
let (req_tx, _) = channel(5);
let rpc_client = RpcClient::new(&self.config.rpc_url)?;
let mut block_processor = self
.new_block_processor(req_tx.clone(), processing_rx)
.await?
.with_pruning(PruningOptions::nothing());
let block = self.rpc_client.get_basic_block_details(height).await?;
block_processor.process_block(block.into()).await
}
pub async fn process_block_range(
&self,
starting_height: Option<u32>,
end_height: Option<u32>,
) -> Result<(), ScraperError> {
if !self.task_tracker.is_empty() {
return Err(ScraperError::ScraperAlreadyRunning);
}
let (_, processing_rx) = unbounded_channel();
let (req_tx, _) = channel(5);
let mut block_processor = self
.new_block_processor(req_tx.clone(), processing_rx)
.await?
.with_pruning(PruningOptions::nothing());
let current_height = self.rpc_client.current_block_height().await? as u32;
let last_processed = block_processor.last_process_height();
let starting_height = match starting_height {
// always attempt to use whatever the user has provided
Some(explicit) => explicit,
None => {
// otherwise, attempt to resume where we last stopped
// and if we haven't processed anything, start from the current height
if last_processed != 0 {
last_processed
} else {
current_height
}
}
};
let end_height = match end_height {
// always attempt to use whatever the user has provided
Some(explicit) => explicit,
None => {
// otherwise, attempt to either go from the start height to the height right
// before the final processed block held in the storage (in case there are gaps)
// or finally, just go to the current block height
if last_processed > starting_height {
last_processed - 1
} else {
current_height
}
}
};
info!(
starting_height = starting_height,
end_height = end_height,
"attempting to process block range"
);
let range = (starting_height..=end_height).collect::<Vec<_>>();
// the most likely bottleneck here are going to be the chain queries,
// so batch multiple requests
for batch in range.chunks(4) {
let batch_result = join_all(
batch
.iter()
.map(|height| self.rpc_client.get_basic_block_details(*height)),
)
.await;
for result in batch_result {
match result {
Ok(block) => block_processor.process_block(block.into()).await?,
Err(err) => {
error!("failed to retrieve the block: {err}. stopping...");
return Err(err);
}
}
}
}
Ok(())
}
fn new_block_requester(
&self,
req_rx: Receiver<BlockRequest>,
processing_tx: UnboundedSender<BlockToProcess>,
) -> BlockRequester {
BlockRequester::new(
// create the tasks
let block_requester = BlockRequester::new(
self.cancel_token.clone(),
self.rpc_client.clone(),
rpc_client.clone(),
req_rx,
processing_tx.clone(),
)
}
async fn new_block_processor(
&self,
req_tx: Sender<BlockRequest>,
processing_rx: UnboundedReceiver<BlockToProcess>,
) -> Result<BlockProcessor, ScraperError> {
BlockProcessor::new(
);
let block_processor = BlockProcessor::new(
self.config.pruning_options,
self.cancel_token.clone(),
self.startup_sync.clone(),
processing_rx,
req_tx,
self.storage.clone(),
self.rpc_client.clone(),
rpc_client,
)
.await
}
async fn new_chain_subscriber(
&self,
processing_tx: UnboundedSender<BlockToProcess>,
) -> Result<ChainSubscriber, ScraperError> {
ChainSubscriber::new(
.await?;
let chain_subscriber = ChainSubscriber::new(
&self.config.websocket_url,
self.cancel_token.clone(),
self.task_tracker.clone(),
processing_tx,
)
.await
}
pub async fn start(&self) -> Result<(), ScraperError> {
let (processing_tx, processing_rx) = unbounded_channel();
let (req_tx, req_rx) = channel(5);
// create the tasks
let block_requester = self.new_block_requester(req_rx, processing_tx.clone());
let block_processor = self.new_block_processor(req_tx, processing_rx).await?;
let chain_subscriber = self.new_chain_subscriber(processing_tx).await?;
.await?;
// spawn them
self.start_tasks(block_requester, block_processor, chain_subscriber);
@@ -16,7 +16,7 @@ use url::Url;
const MAX_FAILURES: usize = 10;
const MAX_RECONNECTION_ATTEMPTS: usize = 8;
const SOCKET_FAILURE_RESET: Duration = Duration::minutes(15);
const SOCKET_FAILURE_RESET: Duration = Duration::hours(2);
pub struct ChainSubscriber {
cancel: CancellationToken,
+3 -6
View File
@@ -435,12 +435,9 @@ where
trace!("update_last_processed");
let start = Instant::now();
sqlx::query!(
"UPDATE metadata SET last_processed_height = MAX(last_processed_height, ?)",
height
)
.execute(executor)
.await?;
sqlx::query!("UPDATE metadata SET last_processed_height = ?", height)
.execute(executor)
.await?;
log_db_operation_time("update_last_processed", start);
Ok(())
+1 -7
View File
@@ -306,13 +306,7 @@ async fn persist_commits(
} => (validator_address, timestamp, signature),
};
let validator = match crate::helpers::validator_info(*validator_id, validators) {
Ok(validator_info) => validator_info,
Err(err) => {
error!("{err}");
continue;
}
};
let validator = crate::helpers::validator_info(*validator_id, validators)?;
let validator_address = crate::helpers::validator_consensus_address(*validator_id)?;
if signature.is_none() {
-2
View File
@@ -13,13 +13,11 @@ license.workspace = true
[dependencies]
serde = { workspace = true }
hex = { workspace = true, optional = true }
bs58 = { workspace = true, optional = true }
base64 = { workspace = true, optional = true }
time = { workspace = true, features = ["formatting", "parsing"], optional = true }
[features]
hex = ["dep:hex"]
bs58 = ["dep:bs58"]
base64 = ["dep:base64"]
date = ["time"]
-14
View File
@@ -32,20 +32,6 @@ pub mod bs58 {
}
}
#[cfg(feature = "hex")]
pub mod hex {
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S: Serializer>(bytes: &[u8], serializer: S) -> Result<S::Ok, S::Error> {
serializer.serialize_str(&::hex::encode(bytes))
}
pub fn deserialize<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Vec<u8>, D::Error> {
let s = String::deserialize(deserializer)?;
::hex::decode(&s).map_err(serde::de::Error::custom)
}
}
#[cfg(feature = "date")]
pub mod date {
use serde::ser::Error;
+2 -1
View File
@@ -39,7 +39,8 @@ impl<St: Storage + Clone + 'static> PeerHandle<St> {
let timeout_check_interval = tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval(DEFAULT_PEER_TIMEOUT_CHECK),
);
let task_client = task_client.fork(format!("peer{public_key}"));
let mut task_client = task_client.fork(format!("peer-{public_key}"));
task_client.disarm();
PeerHandle {
storage,
public_key,
-236
View File
@@ -2,242 +2,6 @@
This page displays a full list of all the changes during our release cycle from [`v2024.3-eclipse`](https://github.com/nymtech/nym/blob/nym-binaries-v2024.3-eclipse/CHANGELOG.md) onwards. Operators can find here the newest updates together with links to relevant documentation. The list is sorted so that the newest changes appear first.
## `v2024.11-wedel`
- [Release binaries](https://github.com/nymtech/nym/releases/tag/nym-binaries-v2024.11-wedel)
- [Release CHANGELOG.md](https://github.com/nymtech/nym/blob/nym-binaries-v2024.11-wedel/CHANGELOG.md)
- [`nym-node`](nodes/nym-node.md) version `1.1.8`
```sh
Binary Name: nym-node
Build Timestamp: 2024-09-27T11:02:37.073944654Z
Build Version: 1.1.8
Commit SHA: c3ec970a377adb25d57be5428551fada2ec55128
Commit Date: 2024-09-26T08:24:53.000000000+02:00
Commit Branch: master
rustc Version: 1.80.1
rustc Channel: stable
cargo Profile: release
```
### Features
- [New Network Monitor](https://github.com/nymtech/nym/pull/4610): Monitors the Nym network by sending itself packages across the mixnet. Network monitor is running two tokio tasks, one manages mixnet clients and another manages monitoring itself. Monitor is designed to be driven externally, via an `HTTP api`. This means that it does not do any monitoring unless driven by something like [`locust`](https://locust.io/). This allows us to tailor the load externally, potentially distributing it across multiple monitors. Includes a dockerised setup for automatically spinning up monitor and driving it with locust.
- *Note: NNM is not deployed on mainnet yet!*
- [Add get_mixnodes_described to validator_client](https://github.com/nymtech/nym/pull/4725)
- [Remove deprecated mark_as_success and use new disarm](https://github.com/nymtech/nym/pull/4751): Update function name to keep terminology consistent with tokio `CancellationToken DropGuard`.
- [Update peer refresh value](https://github.com/nymtech/nym/pull/4754): `lso` expose the value by moving it to wireguard types, and separate the refresh time to the database sync time, so that more probable and needed actions happen faster (refresh) and more improbable ones don't overload the system (peer suspended or stale)
~~~admonish example collapsible=true title='Testing steps performed'
- **Noted** that the constants `DEFAULT_PEER_TIMEOUT` and `DEFAULT_PEER_TIMEOUT_CHECK` have been moved to `common/wireguard-types/src/lib.rs` and are now being used across modules for consistency
- **Observed** that the `peer_controller.rs` now separates the in-memory updates from the storage sync operations to reduce system load
- **Identified** that in-memory updates of peer bandwidth usage happen every `DEFAULT_PEER_TIMEOUT_CHECK` (every 5 seconds), while storage updates occur every 5 * `DEFAULT_PEER_TIMEOUT_CHECK` (every 25 seconds)
**Checked System Load and Performance:**
- **Monitored** system resource usage (CPU, memory, I/O) during the test to assess the impact of the changes
- **Confirmed** that the separation of in-memory updates and storage syncs resulted in reduced system load, particularly I/O operations, compared to previous versions where storage updates occurred more frequently
- **Ensured** that the system remained responsive and no performance bottlenecks were introduced
- **Efficiency Improvement:** The separation of in-memory updates and storage syncs effectively reduced unnecessary database writes, improving system efficiency without compromising data accuracy
~~~
- [Remove duplicate stat count for retransmissions](https://github.com/nymtech/nym/pull/4756)
- [Make gateway latency check generic](https://github.com/nymtech/nym/pull/4759): Replace concrete gateway type with trait in latency check, so we can make use of it in the vpn client.
~~~admonish example collapsible=true title='Testing steps performed'
- Initialised new `nym-client` with the `--latency-based-selection` flag and ensured it still works as normal.
~~~
- [chore: remove repetitive words](https://github.com/nymtech/nym/pull/4763)
- [Avoid race on ip and registration structures](https://github.com/nymtech/nym/pull/4766): To avoid a state where the ip is being cleared out before the registration is also cleared out, couple the two structures under the same lock, since they are anyway very inter-dependent.
~~~admonish example collapsible=true title='Testing steps performed'
1. - **Checked out** the release/2024.10-wedel branch containing the fix for the race condition on IP and registration structures
- **Deployed** the on a controlled test environment to prevent interference
2. **Monitored Logs:**
- **Enabled** debug logging to capture all events
- **Monitored** logs in real-time to observe the handling of concurrent registration requests
- **Checked** for any error messages, warnings, or indications of race conditions
3. **Verified Client Responses:**
- Ensured that all clients received appropriate responses:
- Successful registration with assigned IP and registration data
- Appropriate error messages if no IPs were available or if other issues occurred
- Confirmed that no clients were left in an inconsistent state (e.g., assigned an IP but not fully registered)
4. **Validated Normal Operation:**
- **Conducted standard registration processes** with individual clients to confirm that regular functionality is unaffected via `nym-vpn-cli`
- Ensured that authenticated clients could communicate over the network as expected
~~~
- [Persist used wireguard private IPs](https://github.com/nymtech/nym/pull/4771)
- [Enable dependabot version upgrades for root rust workspace](https://github.com/nymtech/nym/pull/4778)
- [Fix clippy for `unwrap_or_default`](https://github.com/nymtech/nym/pull/4783): Fix nightly build for [beta toolchain](https://github.com/nymtech/nym/actions/runs/10552082396/job/29230401668)
- [Update dependabot](https://github.com/nymtech/nym/pull/4796): Bump max number of dependabot rust PRs to 10. Add readme entry to workspace package.
- [Run `cargo-autoinherit` for a few new crates](https://github.com/nymtech/nym/pull/4801): Run cargo-autoinherit for a few new crates - Sort crates list.
- [Add `axum` server to `nym-api`](https://github.com/nymtech/nym/pull/4803): Summary PR to add axum functionality behind a feature flag `axum`, alongside rocket.
- [Remove unused wireguard flag from SDK](https://github.com/nymtech/nym/pull/4823)
- [Expose wireguard details on self described endpoint](https://github.com/nymtech/nym/pull/4825)
~~~admonish example collapsible=true title='Testing steps performed'
Wireguard details are now visible at the nym-node endpoint `/api/v1/gateway/client-interfaces` as well as on the nym-api self-described endpoint `/api/v1/gateways/described`, above the existing data displaying mixnet_websocket information.
An example of what will be shown is:
```json
"wireguard": {
"port": 51822,
"public_key": "<some public key here>"
}
```
~~~
- [Revamped ticketbook serialisation and exposed additional cli methods](https://github.com/nymtech/nym/pull/4827): `wip` branch that includes changes needed for `vpn-api` alongside additional `ecash utils`
~~~admonish example collapsible=true title='Testing steps performed'
Checked the following commands:
```sh
show-ticket-books # which displays the information about all ticketbooks associated to the client
import-ticket-book # which imports a normal ticketbook to the client alongside `--full` flag
```
On the cli, the following were added: `import-coin-index-signatures`, `import-expiration-date-signatures` and `import-master-verification-key`.
~~~
- [Run cargo autoinherit following last weeks dependabot updates](https://github.com/nymtech/nym/pull/4831)
- [Remove serde_crate named import](https://github.com/nymtech/nym/pull/4832)
- [Create nym-repo-setup debian package and nym-vpn meta package](https://github.com/nymtech/nym/pull/4837): Create nym-repo-setup debian package that sets up the nymtech debian repo on the system it's installed on. It does 2 things:
1. Copy the keyring to `/usr/share/keyrings/nymtech.gpg`
2. Copy the repo spec to `/etc/apt/sources.list.d/nymtech.list`
- Also create a meta package `nym-vpn` which only purpose is to depend on the daemon and UI.
~~~admonish example collapsible=true title='Usage'
1. Install with
```sh
sudo dpkg -i ./nym-repo-setup.deb
```
2. Once it's installed, it should be possible to install the vpn client with
```sh
sudo apt install nym-vpnc
```
3. To reemove the repo, use
```sh
sudo apt remove nym-repo-setup
```
NOTE: removing the repo will not remove any installed nym-vpn packages
~~~
~~~admonish example collapsible=true title='Testing steps performed'
1. **Downloaded** the `nym-repo-setup.deb` package to a Debian-based test system
2. **Installed** the repository setup package using the command:
```bash
sudo dpkg -i ./nym-repo-setup.deb
```
3. **Verified** that the GPG keyring was copied to `/usr/share/keyrings/nymtech.gpg`:
```bash
ls -l /usr/share/keyrings/nymtech.gpg
```
4. **Checked** that the repository specification was added to `/etc/apt/sources.list.d/nymtech.list`:
```bash
cat /etc/apt/sources.list.d/nymtech.list
```
5. **Updated** the package list:
```bash
sudo apt update
```
6. **Installed** the VPN client meta-package:
```bash
sudo apt install nym-vpnc
```
7. **Confirmed** that the `nym-vpnc` package and its dependencies (daemon and UI) were installed successfully
8. **Tested** the VPN client to ensure it operates as expected
9. **Removed** the repository setup package:
```bash
sudo apt remove nym-repo-setup
```
10. **Verified** that the repository specification file `/etc/apt/sources.list.d/nymtech.list` was removed
11. **Ensured** that the installed `nym-vpnc` packages remained installed and functional after removing the repo setup package
~~~
- [Use ecash credential type for bandwidth value](https://github.com/nymtech/nym/pull/4840)
- [Start switching over jobs to arc-ubuntu-20.04](https://github.com/nymtech/nym/pull/4843)
~~~admonish example collapsible=true title='`ci-binary-config-checker`'
```
- ci-build-upload-binaries
- ci-build
- ci-cargo-deny
- ci-contracts-schema
- ci-contracts-upload-binaries
- ci-contracts
- ci-docs
- ci-nym-wallet-rust
- ci-sdk-wasm
```
~~~
- [Move credential verification into common crate](https://github.com/nymtech/nym/pull/4853)
- [Revert runner for `ci-docs`](https://github.com/nymtech/nym/pull/4855)
- [Remove `golang` workaround in `ci-sdk-wasm`](https://github.com/nymtech/nym/pull/4858)
- [Fix linux conditional in `ci-build.yml`](https://github.com/nymtech/nym/pull/4863)
- [Disable push trigger and add missing paths in `ci-build`](https://github.com/nymtech/nym/pull/4864)
- [chore: removed completed queued mixnet migration](https://github.com/nymtech/nym/pull/4865)
- [Bump defguard to github latest version](https://github.com/nymtech/nym/pull/4872)
- [Backport #4894 to fix ci](https://github.com/nymtech/nym/pull/4899)
### Bugfix
- [Fix test failure in ipr request size](https://github.com/nymtech/nym/pull/4844): Nightly build started failing due to a unit test using `now()`, changing the serialized size. Fixed to use a fixed date.
- [Fix clippy for nym-wallet and latest rustc](https://github.com/nymtech/nym/pull/4845)
- [Allow updating globally stored signatures](https://github.com/nymtech/nym/pull/4891)
- [Bugfix/ticketbook false double spending](https://github.com/nymtech/nym/pull/4892)
~~~admonish example collapsible=true title='Testing steps performed'
Tested running a client in mixnet mode, with a standard ticketbook, as well as a client using an imported ticketbook. The double spending bug is no longer an issue, bandwidth is consumed properly, and upon consumption of one ticket another ticket is properly obtained.
~~~
### Operators Guide, Tooling & Updates
- [WSS setup guide updates](https://github.com/nymtech/nym/commit/05d6652177fb77324f8c38b3d8a547d07e729fec): Operators setting up WSS and reverse proxy on Gateways have now cleaner and simpler guide to configure their VPS.
- [Updat hostname instruction for WSS](https://github.com/nymtech/nym/commit/7146c4c012ba7012dc74edc8510bbf377dc32fba): Adding a hostname instruction for clarity
## `nym-node` patch from `release/2024.10-caramello`
- [Patch release binaries](https://github.com/nymtech/nym/releases/tag/nym-binaries-v2024.10-caramello-patch)
@@ -73,6 +73,9 @@ pub(crate) enum InitialAuthenticationError {
#[error("Only 'Register' or 'Authenticate' requests are allowed")]
InvalidRequest,
#[error("received a Message of type {typ} which was not expected in this context")]
UnexpectedMessageType { typ: String },
#[error("Experienced connection error: {0}")]
ConnectionError(#[from] WsError),
@@ -861,9 +864,27 @@ where
Message::Binary(_) => {
return Err(InitialAuthenticationError::BinaryRequestWithoutAuthentication);
}
_ => unreachable!(
"the underlying tunsgenite stream should be handling other message types"
),
other => {
if other.is_ping() {
debug!("unexpected ping message!");
return Err(InitialAuthenticationError::UnexpectedMessageType {
typ: "ping".to_string(),
});
} else if other.is_pong() {
debug!("unexpected pong message!");
return Err(InitialAuthenticationError::UnexpectedMessageType {
typ: "pong".to_string(),
});
} else if other.is_close() {
debug!("unexpected close message!");
return Err(InitialAuthenticationError::UnexpectedMessageType {
typ: "close".to_string(),
});
}
// at this point this is definitely unreachable, but just in case, let's not panic...
return Err(InitialAuthenticationError::InvalidRequest);
}
};
text.parse()
+7 -2
View File
@@ -76,7 +76,7 @@ axum = { workspace = true, features = ["tokio"], optional = true }
axum-extra = { workspace = true, features = ["typed-header"], optional = true }
tower-http = { workspace = true, features = ["cors", "trace"], optional = true }
utoipa = { workspace = true, features = ["axum_extras", "time"], optional = true }
utoipa-swagger-ui = { workspace = true, features = ["axum"], optional = true}
utoipa-swagger-ui = { workspace = true, features = ["axum"], optional = true }
utoipauto = { workspace = true, optional = true }
tracing-subscriber = { workspace = true, features = ["env-filter"], optional = true }
tracing = { workspace = true, optional = true }
@@ -112,7 +112,7 @@ cw4 = { workspace = true }
nym-dkg = { path = "../common/dkg", features = ["cw-types"] }
nym-gateway-client = { path = "../common/client-libs/gateway-client" }
nym-inclusion-probability = { path = "../common/inclusion-probability" }
nym-mixnet-contract-common = { path = "../common/cosmwasm-smart-contracts/mixnet-contract", features = ["utoipa"]}
nym-mixnet-contract-common = { path = "../common/cosmwasm-smart-contracts/mixnet-contract", features = ["utoipa"] }
nym-vesting-contract-common = { path = "../common/cosmwasm-smart-contracts/vesting-contract" }
nym-contracts-common = { path = "../common/cosmwasm-smart-contracts/contracts-common" }
nym-multisig-contract-common = { path = "../common/cosmwasm-smart-contracts/multisig-contract" }
@@ -129,6 +129,10 @@ nym-node-requests = { path = "../nym-node/nym-node-requests" }
nym-types = { path = "../common/types" }
nym-http-api-common = { path = "../common/http-api-common", features = ["utoipa"] }
tikv-jemallocator = { version = "0.6", optional = true, features = ["profiling"] }
tikv-jemalloc-sys = { version = "0.6", optional = true, features = ["stats", "profiling", "unprefixed_malloc_on_supported_platforms"] }
tikv-jemalloc-ctl = { version = "0.6", optional = true, features = ["use_std", "stats", "profiling"] }
[features]
no-reward = []
generate-ts = ["ts-rs"]
@@ -143,6 +147,7 @@ axum = ["dep:axum",
"nym-http-api-common/utoipa",
"nym-mixnet-contract-common/utoipa"
]
memory-prof = ["tikv-jemallocator", "tikv-jemalloc-ctl", "tikv-jemalloc-sys"]
[build-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
+13 -2
View File
@@ -24,7 +24,7 @@ use circulating_supply_api::cache::CirculatingSupplyCache;
use clap::Parser;
use ecash::dkg::controller::DkgController;
use node_status_api::NodeStatusCache;
use nym_bin_common::logging::setup_logging;
use nym_bin_common::logging::{setup_logging, setup_tracing_logger};
use nym_config::defaults::NymNetworkDetails;
use nym_contract_cache::cache::NymContractCache;
use nym_sphinx::receiver::SphinxMessageReceiver;
@@ -44,6 +44,10 @@ pub(crate) mod nym_nodes;
mod status;
pub(crate) mod support;
#[cfg(feature = "memory-prof")]
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[cfg(feature = "axum")]
mod v2;
@@ -58,9 +62,16 @@ async fn main() -> Result<(), anyhow::Error> {
cfg_if::cfg_if! {if #[cfg(feature = "console-subscriber")] {
// instrument tokio console subscriber needs RUSTFLAGS="--cfg tokio_unstable" at build time
console_subscriber::init();
} else {
setup_tracing_logger();
}}
setup_logging();
// setup_tracing_logger();
// std::env::set_var("MALLOC_CONF", "prof:true,lg_prof_interval:28");
// setup_tracing_logger();
// TODO rocket: replace with tracing logger once rocket is eliminated from code
info!("Starting nym api...");
+131
View File
@@ -0,0 +1,131 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::node_status_api::models::RocketErrorResponse;
use okapi::openapi3::{OpenApi, Responses};
use rocket::http::Status;
use rocket::response::Responder;
use rocket::{response, Request, Route};
use rocket_okapi::gen::OpenApiGenerator;
use rocket_okapi::response::OpenApiResponderInner;
use rocket_okapi::settings::OpenApiSettings;
use rocket_okapi::util::ensure_status_code_exists;
use rocket_okapi::{openapi, openapi_get_routes_spec};
// code taken from https://github.dev/GreptimeTeam/greptimedb/blob/develop/src/cmd/src/bin/greptime.rs
#[cfg(feature = "memory-prof")]
pub mod memory_prof {
const PROF_DUMP: &[u8] = b"prof.dump\0";
// const OPT_PROF: &[u8] = b"opt.prof\0";
use anyhow::{bail, Context};
use nym_config::{must_get_home, DEFAULT_NYM_APIS_DIR, NYM_DIR};
use std::ffi::{c_char, CString};
use time::OffsetDateTime;
use tokio::fs::create_dir_all;
use tokio::io::AsyncReadExt;
pub async fn dump_profile() -> anyhow::Result<Vec<u8>> {
if !is_prof_enabled()? {
bail!("memory profiling is not enabled")
}
let now = OffsetDateTime::now_utc();
let dump_path = must_get_home()
.join(NYM_DIR)
.join(DEFAULT_NYM_APIS_DIR)
.join("memory_dumps")
.join(format!("{}", now.unix_timestamp()))
.join("nym-api.hprof");
let parent = dump_path.parent().unwrap();
create_dir_all(&parent).await?;
info!("using {} for the memory dump", dump_path.display());
let path = dump_path
.to_str()
.context("the temp dir contained invalid characters")?
.to_string();
let mut bytes = CString::new(path.as_str())
.context("could not construct a CString out of the path")?
.into_bytes_with_nul();
{
// #safety: we always expect a valid temp file path to write profiling data to.
let ptr = bytes.as_mut_ptr() as *mut c_char;
unsafe {
tikv_jemalloc_ctl::raw::write(PROF_DUMP, ptr).context(format!(
"failed to dump profiling data to {}",
dump_path.display()
))?
}
}
let mut f = tokio::fs::File::open(path.as_str())
.await
.context("failed to open the dump file")?;
let mut buf = vec![];
let _ = f
.read_to_end(&mut buf)
.await
.context("failed to read the dump file")?;
Ok(buf)
}
fn is_prof_enabled() -> anyhow::Result<bool> {
Ok(tikv_jemalloc_ctl::profiling::prof::read()?)
// Ok(unsafe {
// tikv_jemalloc_ctl::raw::read::<bool>(OPT_PROF)
// .context("failed to check the OPT_PROF")?
// })
}
}
pub struct BinaryResponse {
inner: Vec<u8>,
}
impl<'r, 'o: 'r> Responder<'r, 'o> for BinaryResponse {
fn respond_to(self, _req: &'r Request<'_>) -> response::Result<'o> {
let mut res = rocket::Response::new();
res.set_sized_body(self.inner.len(), std::io::Cursor::new(self.inner));
Ok(res)
}
}
impl OpenApiResponderInner for BinaryResponse {
fn responses(_gen: &mut OpenApiGenerator) -> rocket_okapi::Result<Responses> {
let mut responses = Responses::default();
ensure_status_code_exists(&mut responses, 200);
Ok(responses)
}
}
/// foomp
#[cfg(feature = "memory-prof")]
#[openapi(tag = "profiling")]
#[get("/mem")]
pub async fn mem_prof_handler() -> Result<BinaryResponse, RocketErrorResponse> {
let dump_data = memory_prof::dump_profile()
.await
.map_err(|err| RocketErrorResponse::new(err.to_string(), Status::InternalServerError))?;
Ok(BinaryResponse { inner: dump_data })
}
#[cfg(not(feature = "memory-prof"))]
#[openapi(tag = "profiling")]
#[get("/mem")]
pub async fn mem_prof_handler() -> RocketErrorResponse {
RocketErrorResponse::new("The 'mem-prof' feature is disabled", Status::NotImplemented)
}
pub(crate) fn api_status_routes(settings: &OpenApiSettings) -> (Vec<Route>, OpenApi) {
openapi_get_routes_spec![
settings:
mem_prof_handler
]
}
+3
View File
@@ -28,6 +28,8 @@ use rocket_okapi::swagger_ui::make_swagger_ui;
pub(crate) mod helpers;
pub(crate) mod openapi;
pub(crate) mod mem_prof;
pub(crate) async fn setup_rocket(
config: &Config,
network_details: NetworkDetails,
@@ -52,6 +54,7 @@ pub(crate) async fn setup_rocket(
"/api-status" => api_status_routes(&openapi_settings),
"/ecash" => ecash::routes_open_api(&openapi_settings, config.coconut_signer.enabled),
"" => nym_node_routes_deprecated(&openapi_settings),
"/prof" => mem_prof::api_status_routes(&openapi_settings),
// => when we move those routes, we'll need to add a redirection for backwards compatibility
"/unstable/nym-nodes" => nym_node_routes_next(&openapi_settings)
-10
View File
@@ -13,8 +13,6 @@ use url::Url;
pub mod build_info;
pub mod init;
pub mod process_block;
pub mod process_until;
pub mod run;
pub mod upgrade_helpers;
@@ -44,8 +42,6 @@ impl Cli {
match self.command {
Commands::Init(args) => init::execute(args),
Commands::Run(args) => run::execute(args).await,
Commands::ProcessBlock(args) => process_block::execute(args).await,
Commands::ProcessUntil(args) => process_until::execute(args).await,
Commands::BuildInfo(args) => build_info::execute(args),
}
}
@@ -101,12 +97,6 @@ pub(crate) enum Commands {
/// Run the validator rewarder with the preconfigured settings.
Run(run::Args),
/// Attempt to process a single block.
ProcessBlock(process_block::Args),
/// Attempt to process multiple blocks until the provided height.
ProcessUntil(process_until::Args),
/// Show build information of this binary
BuildInfo(build_info::Args),
}
@@ -1,32 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::cli::{try_load_current_config, ConfigOverridableArgs};
use crate::error::NymRewarderError;
use nyxd_scraper::NyxdScraper;
use std::path::PathBuf;
#[derive(Debug, clap::Args)]
pub struct Args {
#[command(flatten)]
config_override: ConfigOverridableArgs,
/// Height of the block we want to process
#[clap(long)]
height: u32,
/// Specifies custom location for the configuration file of nym validators rewarder.
#[clap(long)]
custom_config_path: Option<PathBuf>,
}
pub(crate) async fn execute(args: Args) -> Result<(), NymRewarderError> {
let config =
try_load_current_config(&args.custom_config_path)?.with_override(args.config_override);
NyxdScraper::new(config.scraper_config())
.await?
.process_single_block(args.height)
.await?;
Ok(())
}
@@ -1,45 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::cli::{try_load_current_config, ConfigOverridableArgs};
use crate::error::NymRewarderError;
use nyxd_scraper::NyxdScraper;
use std::path::PathBuf;
#[derive(Debug, clap::Args)]
pub struct Args {
#[command(flatten)]
config_override: ConfigOverridableArgs,
/// Optional starting height for processing the blocks.
/// If none is provided, the default behaviour will be applied.
#[clap(long)]
start_height: Option<u32>,
/// Height of until we want to be processing the blocks.
/// If none is provided, the currrent block height will be used
#[clap(long)]
stop_height: Option<u32>,
/// Specifies custom location for the configuration file of nym validators rewarder.
#[clap(long)]
custom_config_path: Option<PathBuf>,
}
pub(crate) async fn execute(args: Args) -> Result<(), NymRewarderError> {
if let (Some(start), Some(end)) = (args.start_height, args.stop_height) {
if start > end {
eprintln!("the start height can't be larger than the stop height!");
return Ok(());
}
}
let config =
try_load_current_config(&args.custom_config_path)?.with_override(args.config_override);
NyxdScraper::new(config.scraper_config())
.await?
.process_block_range(args.start_height, args.stop_height)
.await?;
Ok(())
}
-4
View File
@@ -2,7 +2,6 @@
// SPDX-License-Identifier: GPL-3.0-only
use crate::config::RewardingRatios;
use crate::rewarder::Epoch;
use nym_compact_ecash::error::CompactEcashError;
use nym_crypto::asymmetric::ed25519;
use nym_validator_client::nym_api::error::NymAPIError;
@@ -179,9 +178,6 @@ pub enum NymRewarderError {
#[error("pruning.keep_recent must not be smaller than {min_to_keep}. got: {keep_recent}")]
TooSmallKeepRecent { min_to_keep: u32, keep_recent: u32 },
#[error("there were no blocks processed within the epoch {epoch}")]
NoBlocksProcessedInEpoch { epoch: Epoch },
}
#[derive(Debug)]
+1 -1
View File
@@ -14,7 +14,7 @@ use nym_network_defaults::setup_env;
pub mod cli;
pub mod config;
pub mod error;
pub mod rewarder;
mod rewarder;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
@@ -49,12 +49,7 @@ impl EpochSigning {
) -> Result<Vec<staking::Validator>, NymRewarderError> {
// first attempt to get it via the historical info.
// if that fails, attempt to use current block information to at least get **something**
if let Ok(Some(validators)) = self
.nyxd_client
.historical_info(height)
.await
.map(|v| v.hist)
{
if let Some(validators) = self.nyxd_client.historical_info(height).await?.hist {
Ok(validators.valset)
} else {
let mut page_request = None;
@@ -68,10 +63,6 @@ impl EpochSigning {
break;
};
if pagination.next_key.is_empty() {
break;
}
page_request = Some(PageRequest {
key: pagination.next_key,
offset: 0,
@@ -101,28 +92,18 @@ impl EpochSigning {
let epoch_start = current_epoch.start_time;
let epoch_end = current_epoch.end_time;
let Some(first_block) = self
let first_block = self
.nyxd_scraper
.storage
.get_first_block_height_after(epoch_start)
.await?
else {
return Err(NymRewarderError::NoBlocksProcessedInEpoch {
epoch: current_epoch,
});
};
let Some(last_block) = self
.unwrap_or_default();
let last_block = self
.nyxd_scraper
.storage
.get_last_block_height_before(epoch_end)
.await?
else {
return Err(NymRewarderError::NoBlocksProcessedInEpoch {
epoch: current_epoch,
});
};
.unwrap_or_default();
// each validator MUST be online at some point during the first 20 blocks, otherwise they're not getting anything.
let vp_range_end = min(first_block + 20, last_block);
@@ -3,7 +3,6 @@
use crate::error::NymRewarderError;
use sqlx::FromRow;
use std::fmt::{Display, Formatter};
use std::ops::Add;
use std::time::Duration;
use time::format_description::well_known::Rfc3339;
@@ -35,11 +34,6 @@ impl Epoch {
})
}
pub fn has_finished(&self) -> bool {
let now = OffsetDateTime::now_utc();
self.end_time < now
}
pub fn until_end(&self) -> Duration {
let now = OffsetDateTime::now_utc();
(self.end_time - now).try_into().unwrap_or_default()
@@ -66,15 +60,3 @@ impl Epoch {
self.end_time.format(&Rfc3339).unwrap()
}
}
impl Display for Epoch {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}: {} - {}",
self.id,
self.start_rfc3339(),
self.end_rfc3339()
)
}
}
+11 -86
View File
@@ -7,6 +7,7 @@ use crate::rewarder::block_signing::types::EpochSigningResults;
use crate::rewarder::block_signing::EpochSigning;
use crate::rewarder::credential_issuance::types::CredentialIssuanceResults;
use crate::rewarder::credential_issuance::CredentialIssuance;
use crate::rewarder::epoch::Epoch;
use crate::rewarder::nyxd_client::NyxdClient;
use crate::rewarder::storage::RewarderStorage;
use futures::future::{FusedFuture, OptionFuture};
@@ -27,8 +28,6 @@ mod nyxd_client;
mod storage;
mod tasks;
pub(crate) use crate::rewarder::epoch::Epoch;
pub struct RewardingResult {
pub total_spent: Coin,
pub rewarding_tx: Hash,
@@ -48,30 +47,22 @@ impl EpochRewards {
pub fn amounts(&self) -> Result<Vec<(AccountId, Vec<Coin>)>, NymRewarderError> {
let mut amounts = Vec::new();
match &self.signing {
Ok(Some(signing)) => {
for (account, signing_amount) in signing.rewarding_amounts(&self.signing_budget) {
if signing_amount[0].amount != 0 {
amounts.push((account, signing_amount))
}
if let Ok(Some(signing)) = &self.signing {
for (account, signing_amount) in signing.rewarding_amounts(&self.signing_budget) {
if signing_amount[0].amount != 0 {
amounts.push((account, signing_amount))
}
}
Err(err) => error!("failed to determine rewards for block signing: {err}"),
_ => (),
}
match &self.credentials {
Ok(Some(credentials)) => {
for (account, credential_amount) in
credentials.rewarding_amounts(&self.credentials_budget)
{
if credential_amount[0].amount != 0 {
amounts.push((account, credential_amount))
}
if let Ok(Some(credentials)) = &self.credentials {
for (account, credential_amount) in
credentials.rewarding_amounts(&self.credentials_budget)
{
if credential_amount[0].amount != 0 {
amounts.push((account, credential_amount))
}
}
Err(err) => error!("failed to determine rewards for credential issuance: {err}"),
_ => (),
}
Ok(amounts)
@@ -288,58 +279,6 @@ impl Rewarder {
self.current_epoch = self.current_epoch.next();
}
async fn ensure_has_epoch_blocks(&self) -> Result<(), NymRewarderError> {
// make sure we at least have a single block processed within the epoch
let epoch_start = self.current_epoch.start_time;
let epoch_end = self.current_epoch.end_time;
if let Some(epoch_signing) = &self.epoch_signing {
if epoch_signing
.nyxd_scraper
.storage
.get_first_block_height_after(epoch_start)
.await?
.is_none()
{
return Err(NymRewarderError::NoBlocksProcessedInEpoch {
epoch: self.current_epoch,
});
}
if epoch_signing
.nyxd_scraper
.storage
.get_last_block_height_before(epoch_end)
.await?
.is_none()
{
return Err(NymRewarderError::NoBlocksProcessedInEpoch {
epoch: self.current_epoch,
});
}
}
Ok(())
}
async fn startup_resync(&mut self) -> Result<(), NymRewarderError> {
// no sync required
if !self.current_epoch.has_finished() {
return Ok(());
}
info!("attempting to distribute missed rewards");
while self.current_epoch.has_finished() {
info!("processing epoch {}", self.current_epoch);
self.ensure_has_epoch_blocks().await?;
// we need to perform rewarding from the 'current' epoch until the actual current epoch
self.handle_epoch_end().await
}
Ok(())
}
pub async fn run(mut self) -> Result<(), NymRewarderError> {
info!("Starting nym validators rewarder");
@@ -367,20 +306,6 @@ impl Rewarder {
let until_end = self.current_epoch.until_end();
if let Err(err) = self.startup_resync().await {
error!("failed to perform startup sync: {err}");
error!("if the failure was due to insufficient number of blocks, your course of action is as follows:");
error!("(ideally it would have been automatically resolved in this very method, but that'd require some serious refactoring)");
error!(
"1. determine height of the first block of the epoch (doesn't have to be exact)"
);
error!("2. run the following subcommand of the rewarder: `nym-validator-rewarder process-until --start-height=$STARTING_BLOCK");
error!("3. !!IMPORTANT!! go to config.toml and temporarily disable block pruning, i.e. `pruning.strategy=nothing`");
error!("4. restart nym-validator-rewarder as normal until it sends missing rewards");
error!("5. re-enable pruning and restart the nym-validator rewarder");
return Err(err);
}
info!(
"the initial epoch (id: {}) will finish in {} secs",
self.current_epoch.id,