Compare commits

..

8 Commits

Author SHA1 Message Date
Jędrzej Stuczyński 351acb7875 expose cancelled 2025-10-13 15:46:29 +01:00
Jędrzej Stuczyński 7f4ef7f772 add drop guard to client tasks 2025-10-13 15:45:46 +01:00
Jędrzej Stuczyński 5025c49a0e using same hierarchy of trackers for client shutdown control 2025-10-13 15:45:46 +01:00
Jędrzej Stuczyński 422f889df7 bugfix: testnet manager 02sql migration (#6096) 2025-10-10 09:38:45 +01:00
Jędrzej Stuczyński c9e96edc35 chore: remove unnecessary closure in 'calculate_score' inside node-status-api 2025-10-09 15:46:15 +01:00
benedetta davico 7768317046 Merge pull request #6095 from nymtech/bugfix/ns-api-download-filesize
ns-api: use download files size from probes instead of parsing filenames
2025-10-08 18:14:00 +02:00
Mark Sinclair 0ebbb1a540 ns-api: use download files size from probes instead of parsing filenames 2025-10-08 17:05:56 +01:00
Jędrzej Stuczyński 827c13b69e moved nym-gateway-probe to monorepo and updated rust-edition to 2024 (#6094)
dont build netstack in CI

additional rust 2024 fixes

fixes

removed temp.rs

first round of cleanup

removed duplicated NS types

moved gateway probe to the monorepo
2025-10-08 16:17:43 +01:00
42 changed files with 497 additions and 143 deletions
Generated
+1 -1
View File
@@ -6598,7 +6598,7 @@ dependencies = [
[[package]]
name = "nym-node-status-api"
version = "4.0.7-test2"
version = "4.0.7"
dependencies = [
"ammonia",
"anyhow",
@@ -341,8 +341,14 @@ where
debug_config.cover_traffic,
stats_tx,
);
shutdown_tracker
.try_spawn_named_with_shutdown(async move { stream.run().await }, "CoverTrafficStream");
let drop_guard = shutdown_tracker.clone_shutdown_token().drop_guard();
shutdown_tracker.try_spawn_named_with_shutdown(
async move {
let _ = drop_guard;
stream.run().await
},
"CoverTrafficStream",
);
}
#[allow(clippy::too_many_arguments)]
@@ -419,8 +425,10 @@ where
"AcknowledgementController::RetransmissionRequestListener",
);
let drop_guard = shutdown_tracker.clone_shutdown_token().drop_guard();
shutdown_tracker.try_spawn_named_with_shutdown(
async move {
let _ = drop_guard;
sent_notification_listener.run().await;
},
"AcknowledgementController::SentNotificationListener",
@@ -431,8 +439,6 @@ where
async move { ack_action_controller.run(shutdown_token).await },
"AcknowledgementController::ActionController",
);
// .start(packet_type);
}
// buffer controlling all messages fetched from provider
@@ -705,8 +711,12 @@ where
// don't spawn the refresher if we don't want to be refreshing the topology.
// only use the initial values obtained
info!("Starting topology refresher...");
let drop_guard = shutdown_tracker.clone_shutdown_token().drop_guard();
shutdown_tracker.try_spawn_named_with_shutdown(
async move { topology_refresher.run().await },
async move {
let _ = drop_guard;
topology_refresher.run().await
},
"TopologyRefresher",
);
}
@@ -892,7 +902,7 @@ where
// Create a shutdown tracker for this client - either as a child of provided tracker
// or get one from the registry
let shutdown_tracker = match self.shutdown {
Some(parent_tracker) => parent_tracker.child_tracker(),
Some(parent_tracker) => parent_tracker.clone(),
None => nym_task::get_sdk_shutdown_tracker()?,
};
@@ -926,7 +936,7 @@ where
self.user_agent.clone(),
generate_client_stats_id(*self_address.identity()),
input_sender.clone(),
&shutdown_tracker.child_tracker(),
&shutdown_tracker,
);
// needs to be started as the first thing to block if required waiting for the gateway
@@ -936,7 +946,7 @@ where
shared_topology_accessor.clone(),
self_address.gateway(),
self.wait_for_gateway,
&shutdown_tracker.child_tracker(),
&shutdown_tracker,
)
.await?;
@@ -956,7 +966,7 @@ where
stats_reporter.clone(),
#[cfg(unix)]
self.connection_fd_callback,
&shutdown_tracker.child_tracker(),
&shutdown_tracker,
)
.await?;
let gateway_ws_fd = gateway_transceiver.ws_fd();
@@ -964,7 +974,7 @@ where
let reply_storage = Self::setup_persistent_reply_storage(
reply_storage_backend,
key_rotation_config,
&shutdown_tracker.child_tracker(),
&shutdown_tracker,
)
.await?;
@@ -975,7 +985,7 @@ where
reply_storage.key_storage(),
reply_controller_sender.clone(),
stats_reporter.clone(),
&shutdown_tracker.child_tracker(),
&shutdown_tracker,
);
// The message_sender is the transmitter for any component generating sphinx packets
@@ -983,10 +993,8 @@ where
// traffic stream.
// The MixTrafficController then sends the actual traffic
let (message_sender, client_request_sender) = Self::start_mix_traffic_controller(
gateway_transceiver,
&shutdown_tracker.child_tracker(),
);
let (message_sender, client_request_sender) =
Self::start_mix_traffic_controller(gateway_transceiver, &shutdown_tracker);
// Channels that the websocket listener can use to signal downstream to the real traffic
// controller that connections are closed.
@@ -1015,7 +1023,7 @@ where
shared_lane_queue_lengths.clone(),
client_connection_rx,
stats_reporter.clone(),
&shutdown_tracker.child_tracker(),
&shutdown_tracker,
);
if !self
@@ -1031,7 +1039,7 @@ where
shared_topology_accessor.clone(),
message_sender,
stats_reporter.clone(),
&shutdown_tracker.child_tracker(),
&shutdown_tracker,
);
}
@@ -138,6 +138,7 @@ impl MixTrafficController {
pub async fn run(&mut self) {
debug!("Started MixTrafficController with graceful shutdown support");
let _drop_guard = self.shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
@@ -80,6 +80,7 @@ impl AcknowledgementListener {
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
debug!("Started AcknowledgementListener with graceful shutdown support");
let _drop_guard = shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
@@ -245,6 +245,7 @@ impl ActionController {
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
debug!("Started ActionController with graceful shutdown support");
let _drop_guard = shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
@@ -216,6 +216,7 @@ where
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
debug!("Started InputMessageListener with graceful shutdown support");
let _drop_guard = shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
@@ -167,6 +167,7 @@ where
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
debug!("Started RetransmissionRequestListener with graceful shutdown support");
let _drop_guard = shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
@@ -585,6 +585,7 @@ where
// avoid borrow on self
let shutdown_token = self.shutdown_token.clone();
let _drop_guard = shutdown_token.clone().drop_guard();
#[cfg(not(target_arch = "wasm32"))]
{
let mut status_timer = tokio::time::interval(Duration::from_secs(5));
@@ -497,6 +497,8 @@ impl<R: MessageReceiver> RequestReceiver<R> {
pub(crate) async fn run(&mut self) {
debug!("Started RequestReceiver with graceful shutdown support");
let _drop_guard = self.shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
@@ -540,6 +542,8 @@ impl<R: MessageReceiver> FragmentedMessageReceiver<R> {
pub(crate) async fn run(&mut self) -> Result<(), MessageRecoveryError> {
debug!("Started FragmentedMessageReceiver with graceful shutdown support");
let _drop_guard = self.shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
@@ -152,6 +152,7 @@ where
let polling_rate = self.config.key_rotation.epoch_duration / 8;
let mut invalidation_inspection = new_interval_stream(polling_rate);
let _drop_guard = shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
@@ -119,6 +119,8 @@ impl StatisticsControl {
let mut snapshot_interval =
gloo_timers::future::IntervalStream::new(SNAPSHOT_INTERVAL.as_millis() as u32);
let _drop_guard = shutdown_token.clone().drop_guard();
loop {
tokio::select! {
biased;
@@ -46,6 +46,7 @@ where
debug!("Started PersistentReplyStorage");
if let Err(err) = self.backend.start_storage_session().await {
shutdown.cancel();
error!("failed to start the storage session - {err}");
return;
}
@@ -89,6 +89,8 @@ impl PartiallyDelegatedRouter {
async fn run(mut self, mut split_stream: SplitStream<WsConn>, shutdown_token: ShutdownToken) {
let mut chunked_stream = (&mut split_stream).ready_chunks(8);
let drop_guard = shutdown_token.clone().drop_guard();
let ret: Result<_, GatewayClientError> = loop {
tokio::select! {
biased;
@@ -101,6 +103,7 @@ impl PartiallyDelegatedRouter {
// received request to stop the task and return the stream
_ = &mut self.stream_return_requester => {
log::debug!("received request to return the split ws stream");
drop_guard.disarm();
break Ok(())
}
socket_msgs = chunked_stream.next() => {
@@ -0,0 +1,32 @@
{
"db_name": "SQLite",
"query": "\n SELECT epoch_id as \"epoch_id: u32\", serialised_key, serialization_revision as \"serialization_revision: u8\"\n FROM master_verification_key WHERE epoch_id = ?\n ",
"describe": {
"columns": [
{
"name": "epoch_id: u32",
"ordinal": 0,
"type_info": "Integer"
},
{
"name": "serialised_key",
"ordinal": 1,
"type_info": "Blob"
},
{
"name": "serialization_revision: u8",
"ordinal": 2,
"type_info": "Integer"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false,
false,
false
]
},
"hash": "0112296b190328a3856d1adf51aafa2525da6c0b871633aad80ad555db9cf47c"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "\n INSERT INTO ecash_deposit_usage (deposit_id, ticketbooks_requested_on, client_pubkey, request_uuid)\n VALUES (?, ?, ?, ?)\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 4
},
"nullable": []
},
"hash": "1fc72f8ba24039548047e1766c9105614dea7fd301f0ec38bfe85bfe546dad40"
}
@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "SELECT\n node_id,\n ed25519_identity_pubkey,\n total_stake,\n ip_addresses as \"ip_addresses!: serde_json::Value\",\n mix_port,\n x25519_sphinx_pubkey,\n node_role as \"node_role: serde_json::Value\",\n supported_roles as \"supported_roles: serde_json::Value\",\n entry as \"entry: serde_json::Value\",\n performance,\n self_described as \"self_described: serde_json::Value\",\n bond_info as \"bond_info: serde_json::Value\",\n http_api_port\n FROM\n nym_nodes\n ORDER BY\n node_id\n ",
"query": "SELECT\n node_id,\n ed25519_identity_pubkey,\n total_stake,\n ip_addresses as \"ip_addresses!: serde_json::Value\",\n mix_port,\n x25519_sphinx_pubkey,\n node_role as \"node_role: serde_json::Value\",\n supported_roles as \"supported_roles: serde_json::Value\",\n entry as \"entry: serde_json::Value\",\n performance,\n self_described as \"self_described: serde_json::Value\",\n bond_info as \"bond_info: serde_json::Value\"\n FROM\n nym_nodes\n WHERE\n self_described IS NOT NULL\n AND\n bond_info IS NOT NULL\n ",
"describe": {
"columns": [
{
@@ -62,11 +62,6 @@
"ordinal": 11,
"name": "bond_info: serde_json::Value",
"type_info": "Jsonb"
},
{
"ordinal": 12,
"name": "http_api_port",
"type_info": "Int4"
}
],
"parameters": {
@@ -84,9 +79,8 @@
true,
false,
true,
true,
true
]
},
"hash": "3ddc12cc4e1796b787a50c40560d2bd71d1cfe5f5265e6f161b3122d1317a421"
"hash": "283f49a65c7d70bf271702ff6a5c7ad6e68c81932d295ff18ed198c54706a57c"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "\n DELETE FROM blinded_shares WHERE created < ?\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 1
},
"nullable": []
},
"hash": "28681fcd8e2d4326f628681b8f2a317aabce063a650be362d3a8ed83cc7c3549"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "\n INSERT INTO global_expiration_date_signatures(expiration_date, epoch_id, serialised_signatures, serialization_revision)\n VALUES (?, ?, ?, ?)\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 4
},
"nullable": []
},
"hash": "2930ca6e3875c74acb7abb9ad889f166ad7f57681f76a1d0c7723d007c1f2c1e"
}
@@ -0,0 +1,20 @@
{
"db_name": "SQLite",
"query": "\n SELECT error_message\n FROM blinded_shares\n WHERE id = ?;\n ",
"describe": {
"columns": [
{
"name": "error_message",
"ordinal": 0,
"type_info": "Text"
}
],
"parameters": {
"Right": 1
},
"nullable": [
true
]
},
"hash": "396f40c33f0f62796eb7449d640bd97845350f4fb9f806c60b93c7cebd5e410d"
}
@@ -0,0 +1,26 @@
{
"db_name": "SQLite",
"query": "\n SELECT serialised_signatures, serialization_revision as \"serialization_revision: u8\"\n FROM global_expiration_date_signatures\n WHERE expiration_date = ? AND epoch_id = ?\n ",
"describe": {
"columns": [
{
"name": "serialised_signatures",
"ordinal": 0,
"type_info": "Blob"
},
{
"name": "serialization_revision: u8",
"ordinal": 1,
"type_info": "Integer"
}
],
"parameters": {
"Right": 2
},
"nullable": [
false,
false
]
},
"hash": "3cc446220668fb3e02f0578104291d2a2af57656b405212af414d765b2263347"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "\n DELETE FROM partial_blinded_wallet_failure WHERE created < ?\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 1
},
"nullable": []
},
"hash": "52b378e282d93db941eff53b5b311e5732ece0bf84ea98f2328b20add8f2b5ef"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "INSERT INTO master_verification_key(epoch_id, serialised_key, serialization_revision) VALUES (?, ?, ?)",
"describe": {
"columns": [],
"parameters": {
"Right": 3
},
"nullable": []
},
"hash": "70d8f240ad6edda6b8c7f2e800e7fca89d80869484f2f3c66cabb898f0298c62"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "\n INSERT INTO partial_blinded_wallet_failure(corresponding_deposit, epoch_id, expiration_date, node_id, created, failure_message)\n VALUES (?, ?, ?, ?, ?, ?)\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 6
},
"nullable": []
},
"hash": "97d97ebb6bc8f4114fdea9ebc9f57f91a11f5057273cb70bd0e629712d17dd41"
}
@@ -0,0 +1,32 @@
{
"db_name": "SQLite",
"query": "\n SELECT epoch_id as \"epoch_id: u32\", serialised_signatures, serialization_revision as \"serialization_revision: u8\"\n FROM global_coin_index_signatures WHERE epoch_id = ?\n ",
"describe": {
"columns": [
{
"name": "epoch_id: u32",
"ordinal": 0,
"type_info": "Integer"
},
{
"name": "serialised_signatures",
"ordinal": 1,
"type_info": "Blob"
},
{
"name": "serialization_revision: u8",
"ordinal": 2,
"type_info": "Integer"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false,
false,
false
]
},
"hash": "a8b7ce0fe4755c28b96d1e503e313ab15fed747fb0cee1c9f949fb58461b3f79"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "\n DELETE FROM partial_blinded_wallet WHERE created < ?\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 1
},
"nullable": []
},
"hash": "b8257a0832d0124f0a8aaaf81dc6a811c593aea8febf1f891117e5e84213f147"
}
@@ -0,0 +1,38 @@
{
"db_name": "SQLite",
"query": "\n SELECT\n t1.node_id as \"node_id!\",\n t1.blinded_signature as \"blinded_signature!\",\n t1.epoch_id as \"epoch_id!\",\n t1.expiration_date as \"expiration_date!: Date\"\n FROM partial_blinded_wallet as t1\n JOIN ecash_deposit_usage as t2\n on t1.corresponding_deposit = t2.deposit_id\n JOIN blinded_shares as t3\n ON t2.request_uuid = t3.request_uuid\n WHERE t3.device_id = ? AND t3.credential_id = ?;\n ",
"describe": {
"columns": [
{
"name": "node_id!",
"ordinal": 0,
"type_info": "Integer"
},
{
"name": "blinded_signature!",
"ordinal": 1,
"type_info": "Blob"
},
{
"name": "epoch_id!",
"ordinal": 2,
"type_info": "Integer"
},
{
"name": "expiration_date!: Date",
"ordinal": 3,
"type_info": "Date"
}
],
"parameters": {
"Right": 2
},
"nullable": [
true,
true,
true,
true
]
},
"hash": "c2b841762bdb963fff337ef5c8ec9f560017b4da6b0303ea0397d9568229e167"
}
@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "SELECT\n node_id,\n ed25519_identity_pubkey,\n total_stake,\n ip_addresses as \"ip_addresses!: serde_json::Value\",\n mix_port,\n x25519_sphinx_pubkey,\n node_role as \"node_role: serde_json::Value\",\n supported_roles as \"supported_roles: serde_json::Value\",\n entry as \"entry: serde_json::Value\",\n performance,\n self_described as \"self_described: serde_json::Value\",\n bond_info as \"bond_info: serde_json::Value\",\n http_api_port\n FROM\n nym_nodes\n WHERE\n self_described IS NOT NULL\n AND\n bond_info IS NOT NULL\n ",
"query": "SELECT\n node_id,\n ed25519_identity_pubkey,\n total_stake,\n ip_addresses as \"ip_addresses!: serde_json::Value\",\n mix_port,\n x25519_sphinx_pubkey,\n node_role as \"node_role: serde_json::Value\",\n supported_roles as \"supported_roles: serde_json::Value\",\n entry as \"entry: serde_json::Value\",\n performance,\n self_described as \"self_described: serde_json::Value\",\n bond_info as \"bond_info: serde_json::Value\"\n FROM\n nym_nodes\n ORDER BY\n node_id\n ",
"describe": {
"columns": [
{
@@ -62,11 +62,6 @@
"ordinal": 11,
"name": "bond_info: serde_json::Value",
"type_info": "Jsonb"
},
{
"ordinal": 12,
"name": "http_api_port",
"type_info": "Int4"
}
],
"parameters": {
@@ -84,9 +79,8 @@
true,
false,
true,
true,
true
]
},
"hash": "0b51df277ed66c6553f66af9b135342dee177abc1c92e4a89147de3c22d3d1a5"
"hash": "c48d04fc3de59dd484f0a63d40336ced54e08785f77e9ef85f3157d004ec85dc"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "INSERT INTO global_coin_index_signatures(epoch_id, serialised_signatures, serialization_revision) VALUES (?, ?, ?)",
"describe": {
"columns": [],
"parameters": {
"Right": 3
},
"nullable": []
},
"hash": "d3510846941fa2525926b9bfbcdabd806877ce914b514d4f7cd6be318c4debe6"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "\n INSERT INTO partial_blinded_wallet(corresponding_deposit, epoch_id, expiration_date, node_id, created, blinded_signature)\n VALUES (?, ?, ?, ?, ?, ?)\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 6
},
"nullable": []
},
"hash": "db176e98198fe594d88eb860d918f633a94d18a19b7f0f96935a62560def7d0f"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "\n UPDATE ecash_deposit_usage\n SET ticketbook_request_error = ?\n WHERE deposit_id = ?\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 2
},
"nullable": []
},
"hash": "e584253e3856355899537eb8fc152f2bfed2d918b894ec0f588e38dd5e8ad726"
}
@@ -0,0 +1,38 @@
{
"db_name": "SQLite",
"query": "\n SELECT t1.node_id, t1.blinded_signature, t1.epoch_id, t1.expiration_date as \"expiration_date!: Date\"\n FROM partial_blinded_wallet as t1\n JOIN ecash_deposit_usage as t2\n on t1.corresponding_deposit = t2.deposit_id\n JOIN blinded_shares as t3\n ON t2.request_uuid = t3.request_uuid\n WHERE t3.id = ?;\n ",
"describe": {
"columns": [
{
"name": "node_id",
"ordinal": 0,
"type_info": "Integer"
},
{
"name": "blinded_signature",
"ordinal": 1,
"type_info": "Blob"
},
{
"name": "epoch_id",
"ordinal": 2,
"type_info": "Integer"
},
{
"name": "expiration_date!: Date",
"ordinal": 3,
"type_info": "Date"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false,
false,
false,
false
]
},
"hash": "e77ffab19b099b84470fe5611716a2e314787586a46cffd074abb67f2f4d109e"
}
@@ -0,0 +1,20 @@
{
"db_name": "SQLite",
"query": "\n SELECT error_message\n FROM blinded_shares\n WHERE device_id = ? AND credential_id = ?;\n ",
"describe": {
"columns": [
{
"name": "error_message",
"ordinal": 0,
"type_info": "Text"
}
],
"parameters": {
"Right": 2
},
"nullable": [
true
]
},
"hash": "ef60c2683211cc4ec2d3e46392518a1f62fa67dfe8f130deb876ebee11bf1602"
}
@@ -3,7 +3,7 @@
[package]
name = "nym-node-status-api"
version = "4.0.7-test2"
version = "4.0.7"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
@@ -1,2 +0,0 @@
ALTER TABLE nym_nodes
ADD COLUMN http_api_port INTEGER;
@@ -381,7 +381,7 @@ impl ScrapeNodeKind {
pub(crate) struct ScraperNodeInfo {
pub node_kind: ScrapeNodeKind,
pub hosts: Vec<String>,
pub http_api_port: Option<u16>,
pub http_api_port: i64,
}
impl ScraperNodeInfo {
@@ -395,21 +395,8 @@ impl ScraperNodeInfo {
format!("http://{}", host),
]);
if let Some(custom_http_api_port) = self.http_api_port {
urls = Vec::new();
for host in &self.hosts {
urls.append(&mut vec![format!(
"http://{}:{}",
host, custom_http_api_port
)]);
}
// do not fall back to default ports, if the operator sets a custom http api port
// in their bond, use it and error out if it's not available
// this will correctly handle cases where some operators run multiple nodes
// on a single IP address and assign different custom http port apis at bond time
// urls.insert(0, format!("http://{}:{}", host, custom_http_api_port));
if self.http_api_port != DEFAULT_NYM_NODE_HTTP_PORT as i64 {
urls.insert(0, format!("http://{}:{}", host, self.http_api_port));
}
}
@@ -436,7 +423,6 @@ pub(crate) struct NymNodeDto {
pub performance: String,
pub self_described: Option<serde_json::Value>,
pub bond_info: Option<serde_json::Value>,
pub http_api_port: Option<i32>,
}
#[allow(dead_code)] // it's not dead code but clippy doesn't detect usage in sqlx macros
@@ -454,7 +440,6 @@ pub(crate) struct NymNodeInsertRecord {
pub entry: Option<serde_json::Value>,
pub self_described: Option<serde_json::Value>,
pub bond_info: Option<serde_json::Value>,
pub http_api_port: Option<i32>,
pub last_updated_utc: i64,
}
@@ -471,12 +456,6 @@ impl NymNodeInsertRecord {
.map(|info| decimal_to_i64(info.total_stake()))
.unwrap_or(0);
let entry = serialize_opt_to_value!(skimmed_node.entry)?;
let http_api_port = bond_info.and_then(|bond| {
bond.bond_information
.node
.custom_http_port
.map(|port| port as i32)
});
let bond_info = serialize_opt_to_value!(bond_info)?;
let self_described = serialize_opt_to_value!(self_described)?;
@@ -493,7 +472,6 @@ impl NymNodeInsertRecord {
entry,
self_described,
bond_info,
http_api_port,
last_updated_utc: now,
};
@@ -35,8 +35,7 @@ pub(crate) async fn get_all_nym_nodes(pool: &DbPool) -> anyhow::Result<Vec<NymNo
entry as "entry: serde_json::Value",
performance,
self_described as "self_described: serde_json::Value",
bond_info as "bond_info: serde_json::Value",
http_api_port
bond_info as "bond_info: serde_json::Value"
FROM
nym_nodes
ORDER BY
@@ -73,8 +72,7 @@ pub(crate) async fn get_described_bonded_nym_nodes(
entry as "entry: serde_json::Value",
performance,
self_described as "self_described: serde_json::Value",
bond_info as "bond_info: serde_json::Value",
http_api_port
bond_info as "bond_info: serde_json::Value"
FROM
nym_nodes
WHERE
@@ -21,11 +21,10 @@ pub(crate) async fn get_nodes_for_scraping(pool: &DbPool) -> Result<Vec<ScraperN
let skimmed_nodes = queries::get_described_bonded_nym_nodes(pool)
.await
.map(|nodes_dto| {
nodes_dto.into_iter().filter_map(|node_dto| {
let node_id = node_dto.node_id;
let http_api_port = node_dto.http_api_port;
match SkimmedNode::try_from(node_dto) {
Ok(node) => Some((node, http_api_port)),
nodes_dto.into_iter().filter_map(|node| {
let node_id = node.node_id;
match SkimmedNode::try_from(node) {
Ok(node) => Some(node),
Err(e) => {
tracing::error!("Failed to decode node_id={}: {}", node_id, e);
None
@@ -34,7 +33,7 @@ pub(crate) async fn get_nodes_for_scraping(pool: &DbPool) -> Result<Vec<ScraperN
})
})?;
skimmed_nodes.for_each(|(node, http_api_port)| {
skimmed_nodes.for_each(|node| {
// TODO: relies on polyfilling: Nym nodes table might contain legacy mixnodes
// as well. Categorize them here.
let node_kind = if gateway_keys.contains(&node.ed25519_identity_pubkey.to_base58_string()) {
@@ -55,7 +54,7 @@ pub(crate) async fn get_nodes_for_scraping(pool: &DbPool) -> Result<Vec<ScraperN
.into_iter()
.map(|ip| ip.to_string())
.collect::<Vec<_>>(),
http_api_port: http_api_port.map(|port| port as u16),
http_api_port: node.mix_port.into(),
})
});
@@ -138,7 +138,6 @@ mod db_tests {
performance: "1.0".to_string(),
self_described: None,
bond_info: None,
http_api_port: None,
};
let skimmed_node: nym_validator_client::nym_api::SkimmedNode =
@@ -363,42 +362,22 @@ fn test_scraper_node_info_contact_addresses() {
let node_info = ScraperNodeInfo {
node_kind: ScrapeNodeKind::MixingNymNode { node_id: 123 },
hosts: vec!["1.1.1.1".to_string(), "example.com".to_string()],
http_api_port: None,
http_api_port: 8080,
};
let addresses = node_info.contact_addresses();
// Should generate multiple URLs for each host
// When no custom port is specified only default ports should be used
// Custom port (8080) should be inserted at the beginning
assert!(addresses.contains(&"http://1.1.1.1:8080".to_string()));
assert!(addresses.contains(&"http://example.com:8080".to_string()));
assert!(addresses.contains(&"http://1.1.1.1:8000".to_string()));
assert!(addresses.contains(&"https://1.1.1.1".to_string()));
assert!(addresses.contains(&"http://1.1.1.1".to_string()));
assert!(addresses.contains(&"http://example.com:8000".to_string()));
// Check that URLs follow the expected pattern
assert!(addresses.len() >= 8); // At least 4 URLs per host
}
#[test]
fn test_scraper_node_info_contact_addresses_with_custom_http_api_port() {
use crate::db::models::{ScrapeNodeKind, ScraperNodeInfo};
let node_info = ScraperNodeInfo {
node_kind: ScrapeNodeKind::MixingNymNode { node_id: 123 },
hosts: vec!["1.1.1.1".to_string(), "example.com".to_string()],
http_api_port: Some(4444),
};
let addresses = node_info.contact_addresses();
// Should generate multiple URLs for each host
// Custom port (4444) should be the only port in the list
assert!(addresses.contains(&"http://1.1.1.1:4444".to_string()));
assert!(addresses.contains(&"http://example.com:4444".to_string()));
// Check that URLs follow the expected pattern
assert!(addresses.len() >= 2); // At least 4 URLs per host
}
#[test]
fn test_scrape_node_kind_node_id() {
use crate::db::models::ScrapeNodeKind;
@@ -435,7 +414,6 @@ fn test_nym_node_dto_with_invalid_keys() {
performance: "1.0".to_string(),
self_described: None,
bond_info: None,
http_api_port: None,
};
let result: Result<nym_validator_client::nym_api::SkimmedNode, _> = nym_node_dto.try_into();
@@ -473,7 +451,6 @@ fn test_nym_node_dto_with_invalid_performance() {
performance: "invalid_percent".to_string(),
self_described: None,
bond_info: None,
http_api_port: None,
};
let result: Result<nym_validator_client::nym_api::SkimmedNode, _> = nym_node_dto.try_into();
@@ -395,24 +395,20 @@ fn calculate_score(gateway: &Gateway, probe_outcome: &LastProbeResult) -> ScoreV
.map(|p| {
let ping_ips_performance = p.ping_ips_performance_v4 as f64;
let duration_sec =
p.download_duration_milliseconds_v4
.unwrap_or_else(|| p.download_duration_sec_v4 * 1000) as f64
/ 1000f64;
let duration = p.download_duration_sec_v4 as f64;
// get the file size downloaded in bytes and convert to MB, or default to 1MB
let file_size_mb = p.downloaded_file_size_bytes_v4.unwrap_or_else(|| 1048576) as f64
/ 1024f64
/ 1024f64;
let speed_mbps = file_size_mb / duration_sec;
let file_size_mb =
p.downloaded_file_size_bytes_v4.unwrap_or(1048576) as f64 / 1024f64 / 1024f64;
let speed_mbps = file_size_mb / duration;
let file_download_score = if speed_mbps > 10.0 {
let file_download_score = if speed_mbps > 100.0 {
1.0
} else if speed_mbps > 5.0 {
} else if speed_mbps > 50.0 {
0.75
} else if speed_mbps > 2.0 {
} else if speed_mbps > 20.0 {
0.5
} else if speed_mbps > 1.0 {
} else if speed_mbps > 10.0 {
0.25
} else {
0.1
+1 -1
View File
@@ -759,7 +759,7 @@ where
client_output,
client_state.clone(),
nym_address,
started_client.shutdown_handle.child_tracker(),
started_client.shutdown_handle.clone(),
packet_type,
);
@@ -25,6 +25,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::sync::RwLockReadGuard;
use tokio_util::sync::CancellationToken;
use tokio_util::sync::WaitForCancellationFutureOwned;
/// Client connected to the Nym mixnet.
pub struct MixnetClient {
@@ -273,6 +274,12 @@ impl MixnetClient {
}
}
}
pub fn cancelled(&self) -> WaitForCancellationFutureOwned {
self.shutdown_handle
.clone_shutdown_token()
.cancelled_owned()
}
}
#[derive(Clone)]
@@ -3,48 +3,110 @@
* SPDX-License-Identifier: GPL-3.0-only
*/
-- 1. Rename old table to preserve data
ALTER TABLE network
RENAME TO network_old;
CREATE TABLE network_old
-- 2. Insert placeholder account (so that old networks would have _some_ value for performance contract)
INSERT INTO account (address, mnemonic)
VALUES ('n1tq2kggc6y44yqmnafh98vexxav8666cfkgvygf',
'opinion scene salon slice noise easy security drift brown custom verb express old matrix mammal choose attract trash general staff manual elite destroy strategy');
-- 3. Insert placeholder contract and record its id
INSERT INTO contract (name, address, admin_address)
VALUES ('placeholder', 'n14gl07zh58rydd4k9tyw320zvqd79vrwnjj4x9g', 'n1tq2kggc6y44yqmnafh98vexxav8666cfkgvygf');
CREATE TEMP TABLE tmp_placeholder
(
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,
mixnet_contract_id INTEGER NOT NULL REFERENCES contract (id),
vesting_contract_id INTEGER NOT NULL REFERENCES contract (id),
ecash_contract_id INTEGER NOT NULL REFERENCES contract (id),
cw3_multisig_contract_id INTEGER NOT NULL REFERENCES contract (id),
cw4_group_contract_id INTEGER NOT NULL REFERENCES contract (id),
dkg_contract_id INTEGER NOT NULL REFERENCES contract (id),
rewarder_address TEXT NOT NULL REFERENCES account (address),
ecash_holding_account_address TEXT NOT NULL REFERENCES account (address)
id INTEGER NOT NULL
);
INSERT INTO tmp_placeholder
VALUES (last_insert_rowid());
INSERT INTO network_old
SELECT *
from network;
DROP TABLE network;
-- 4. Create the new network table with the new column
CREATE TABLE network
(
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
name TEXT NOT NULL,
created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,
mixnet_contract_id INTEGER NOT NULL REFERENCES contract (id),
vesting_contract_id INTEGER NOT NULL REFERENCES contract (id),
ecash_contract_id INTEGER NOT NULL REFERENCES contract (id),
cw3_multisig_contract_id INTEGER NOT NULL REFERENCES contract (id),
cw4_group_contract_id INTEGER NOT NULL REFERENCES contract (id),
dkg_contract_id INTEGER NOT NULL REFERENCES contract (id),
performance_contract_id INTEGER NOT NULL REFERENCES contract (id),
mixnet_contract_id INTEGER NOT NULL REFERENCES contract (id),
vesting_contract_id INTEGER NOT NULL REFERENCES contract (id),
ecash_contract_id INTEGER NOT NULL REFERENCES contract (id),
cw3_multisig_contract_id INTEGER NOT NULL REFERENCES contract (id),
cw4_group_contract_id INTEGER NOT NULL REFERENCES contract (id),
dkg_contract_id INTEGER NOT NULL REFERENCES contract (id),
performance_contract_id INTEGER NOT NULL REFERENCES contract (id),
rewarder_address TEXT NOT NULL REFERENCES account (address),
ecash_holding_account_address TEXT NOT NULL REFERENCES account (address)
rewarder_address TEXT NOT NULL REFERENCES account (address),
ecash_holding_account_address TEXT NOT NULL REFERENCES account (address)
);
-- 5. Copy existing data into the new table
INSERT INTO network(id, name, created_at,
mixnet_contract_id, vesting_contract_id, ecash_contract_id,
cw3_multisig_contract_id, cw4_group_contract_id, dkg_contract_id,
performance_contract_id,
rewarder_address, ecash_holding_account_address)
SELECT n.id,
n.name,
n.created_at,
n.mixnet_contract_id,
n.vesting_contract_id,
n.ecash_contract_id,
n.cw3_multisig_contract_id,
n.cw4_group_contract_id,
n.dkg_contract_id,
t.id, -- use the placeholder contract id
n.rewarder_address,
n.ecash_holding_account_address
FROM network_old AS n
CROSS JOIN tmp_placeholder AS t;
-- 6. recreate metadata table due to change in FK
ALTER TABLE metadata
RENAME TO metadata_old;
CREATE TABLE metadata
(
id INTEGER PRIMARY KEY CHECK (id = 0),
latest_network_id INTEGER REFERENCES network (id),
master_mnemonic TEXT NOT NULL,
rpc_endpoint TEXT NOT NULL
);
INSERT INTO metadata
SELECT *
FROM metadata_old;
-- 7. recreate node table due to change in FK
ALTER Table node
RENAME TO node_old;
CREATE TABLE node
(
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
identity_key TEXT NOT NULL,
network_id INTEGER NOT NULL REFERENCES network (id),
-- i.e. mixnode or gateway
bonded_type TEXT NOT NULL,
owner_address TEXT NOT NULL REFERENCES account (address)
);
INSERT INTO node
SELECT *
FROM node_old;
-- 8. Clean up
DROP TABLE tmp_placeholder;
DROP TABLE metadata_old;
DROP TABLE node_old;
DROP TABLE network_old;
CREATE TABLE authorised_network_monitor
(
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,