NetworkMonitorBuilder - starting the monitor after rocket has launched (#754)

* NetworkMonitorBuilder - starting the monitor after rocket has launched

* Removed unused import
This commit is contained in:
Jędrzej Stuczyński
2021-08-27 13:52:18 +01:00
committed by GitHub
parent 1811df9ddb
commit 5f42a9bd05
2 changed files with 134 additions and 103 deletions
+37 -24
View File
@@ -7,7 +7,7 @@ extern crate rocket;
use crate::cache::ValidatorCacheRefresher;
use crate::config::Config;
use crate::network_monitor::tested_network::good_topology::parse_topology_file;
use crate::network_monitor::{new_monitor_runnables, NetworkMonitorRunnables};
use crate::network_monitor::NetworkMonitorBuilder;
use crate::nymd_client::Client;
use crate::storage::NodeStatusStorage;
use ::config::{defaults::DEFAULT_VALIDATOR_API_PORT, NymConfig};
@@ -16,10 +16,13 @@ use cache::ValidatorCache;
use clap::{App, Arg, ArgMatches};
use coconut::InternalSignRequest;
use log::info;
use rocket::fairing::AdHoc;
use rocket::http::Method;
use rocket::{Ignite, Rocket};
use rocket_cors::{AllowedHeaders, AllowedOrigins, Cors};
use std::process;
use std::sync::Arc;
use tokio::sync::Notify;
use url::Url;
pub(crate) mod cache;
@@ -220,10 +223,16 @@ fn setup_cors() -> Result<Cors> {
Ok(cors)
}
async fn setup_network_monitor(
config: &Config,
fn setup_liftoff_notify(notify: Arc<Notify>) -> AdHoc {
AdHoc::on_liftoff("Liftoff notifier", |_| {
Box::pin(async move { notify.notify_one() })
})
}
fn setup_network_monitor<'a>(
config: &'a Config,
rocket: &Rocket<Ignite>,
) -> Option<NetworkMonitorRunnables> {
) -> Option<NetworkMonitorBuilder<'a>> {
if !config.get_network_monitor_enabled() {
return None;
}
@@ -236,19 +245,16 @@ async fn setup_network_monitor(
let v6_topology = parse_topology_file(config.get_v6_good_topology_file());
network_monitor::check_if_up_to_date(&v4_topology, &v6_topology);
Some(
new_monitor_runnables(
config,
v4_topology,
v6_topology,
node_status_storage,
validator_cache,
)
.await,
)
Some(NetworkMonitorBuilder::new(
config,
v4_topology,
v6_topology,
node_status_storage,
validator_cache,
))
}
async fn setup_rocket(config: &Config) -> Result<Rocket<Ignite>> {
async fn setup_rocket(config: &Config, liftoff_notify: Arc<Notify>) -> Result<Rocket<Ignite>> {
// let's build our rocket!
let rocket_config = rocket::config::Config {
// TODO: probably the port should be configurable?
@@ -257,6 +263,7 @@ async fn setup_rocket(config: &Config) -> Result<Rocket<Ignite>> {
};
let rocket = rocket::custom(rocket_config)
.attach(setup_cors()?)
.attach(setup_liftoff_notify(liftoff_notify))
.attach(ValidatorCache::stage())
.attach(InternalSignRequest::stage(config.keypair()));
@@ -296,10 +303,11 @@ async fn main() -> Result<()> {
let matches = parse_args();
let config = override_config(config, &matches);
let liftoff_notify = Arc::new(Notify::new());
// let's build our rocket!
let rocket = setup_rocket(&config).await?;
let monitor_runnables = setup_network_monitor(&config, &rocket).await;
let rocket = setup_rocket(&config, Arc::clone(&liftoff_notify)).await?;
let monitor_builder = setup_network_monitor(&config, &rocket);
let validator_cache = rocket.state::<ValidatorCache>().unwrap().clone();
@@ -327,19 +335,24 @@ async fn main() -> Result<()> {
tokio::spawn(async move { validator_cache_refresher.run().await });
}
if let Some(runnables) = monitor_runnables {
// launch the rocket!
let shutdown_handle = rocket.shutdown();
tokio::spawn(rocket.launch());
// to finish building our monitor, we need to have rocket up and running so that we could
// obtain our bandwidth credential
if let Some(monitor_builder) = monitor_builder {
info!("Starting network monitor...");
// spawn network monitor!
// wait for rocket's liftoff stage
liftoff_notify.notified().await;
// we're ready to go! spawn the network monitor!
let runnables = monitor_builder.build().await;
runnables.spawn_tasks();
} else {
info!("Network monitoring is disabled.");
}
// and launch the rocket
let shutdown_handle = rocket.shutdown();
tokio::spawn(rocket.launch());
wait_for_interrupt().await;
shutdown_handle.notify();
+97 -79
View File
@@ -31,6 +31,103 @@ pub(crate) mod monitor;
pub(crate) mod test_packet;
pub(crate) mod tested_network;
pub(crate) struct NetworkMonitorBuilder<'a> {
config: &'a Config,
tested_network: TestedNetwork,
node_status_storage: NodeStatusStorage,
validator_cache: ValidatorCache,
}
impl<'a> NetworkMonitorBuilder<'a> {
pub(crate) fn new(
config: &'a Config,
v4_topology: NymTopology,
v6_topology: NymTopology,
node_status_storage: NodeStatusStorage,
validator_cache: ValidatorCache,
) -> Self {
let tested_network = TestedNetwork::new_good(v4_topology, v6_topology);
NetworkMonitorBuilder {
config,
tested_network,
node_status_storage,
validator_cache,
}
}
pub(crate) async fn build(self) -> NetworkMonitorRunnables {
// TODO: in the future I guess this should somehow change to distribute the load
let tested_mix_gateway = self.tested_network.main_v4_gateway().clone();
info!(
"* gateway for testing mixnodes: {}",
tested_mix_gateway.identity_key.to_base58_string()
);
// TODO: those keys change constant throughout the whole execution of the monitor.
// and on top of that, they are used with ALL the gateways -> presumably this should change
// in the future
let mut rng = rand::rngs::OsRng;
let identity_keypair = Arc::new(identity::KeyPair::new(&mut rng));
let encryption_keypair = Arc::new(encryption::KeyPair::new(&mut rng));
let test_mixnode_sender = Recipient::new(
*identity_keypair.public_key(),
*encryption_keypair.public_key(),
tested_mix_gateway.identity_key,
);
let (gateway_status_update_sender, gateway_status_update_receiver) = mpsc::unbounded();
let (received_processor_sender_channel, received_processor_receiver_channel) =
mpsc::unbounded();
let packet_preparer = new_packet_preparer(
self.validator_cache,
self.tested_network.clone(),
test_mixnode_sender,
*identity_keypair.public_key(),
*encryption_keypair.public_key(),
);
let bandwidth_credential =
TEMPORARY_obtain_bandwidth_credential(self.config, identity_keypair.public_key()).await;
let packet_sender = new_packet_sender(
self.config,
gateway_status_update_sender,
Arc::clone(&identity_keypair),
bandwidth_credential,
self.config.get_gateway_sending_rate(),
);
let received_processor = new_received_processor(
received_processor_receiver_channel,
Arc::clone(&encryption_keypair),
);
let summary_producer = new_summary_producer(self.config.get_detailed_report());
let packet_receiver = new_packet_receiver(
gateway_status_update_receiver,
received_processor_sender_channel,
);
let monitor = monitor::Monitor::new(
self.config,
packet_preparer,
packet_sender,
received_processor,
summary_producer,
self.node_status_storage,
self.tested_network,
);
NetworkMonitorRunnables {
monitor,
packet_receiver,
}
}
}
pub(crate) struct NetworkMonitorRunnables {
monitor: Monitor,
packet_receiver: PacketReceiver,
@@ -48,85 +145,6 @@ impl NetworkMonitorRunnables {
}
}
pub(crate) async fn new_monitor_runnables(
config: &Config,
v4_topology: NymTopology,
v6_topology: NymTopology,
node_status_storage: NodeStatusStorage,
validator_cache: ValidatorCache,
) -> NetworkMonitorRunnables {
// TODO: in the future I guess this should somehow change to distribute the load
let tested_mix_gateway = v4_topology.gateways()[0].clone();
info!(
"* gateway for testing mixnodes: {}",
tested_mix_gateway.identity_key.to_base58_string()
);
let tested_network = TestedNetwork::new_good(v4_topology, v6_topology);
// TODO: those keys change constant throughout the whole execution of the monitor.
// and on top of that, they are used with ALL the gateways -> presumably this should change
// in the future
let mut rng = rand::rngs::OsRng;
let identity_keypair = Arc::new(identity::KeyPair::new(&mut rng));
let encryption_keypair = Arc::new(encryption::KeyPair::new(&mut rng));
let test_mixnode_sender = Recipient::new(
*identity_keypair.public_key(),
*encryption_keypair.public_key(),
tested_mix_gateway.identity_key,
);
let (gateway_status_update_sender, gateway_status_update_receiver) = mpsc::unbounded();
let (received_processor_sender_channel, received_processor_receiver_channel) =
mpsc::unbounded();
let packet_preparer = new_packet_preparer(
validator_cache,
tested_network.clone(),
test_mixnode_sender,
*identity_keypair.public_key(),
*encryption_keypair.public_key(),
);
let bandwidth_credential =
TEMPORARY_obtain_bandwidth_credential(config, identity_keypair.public_key()).await;
let packet_sender = new_packet_sender(
config,
gateway_status_update_sender,
Arc::clone(&identity_keypair),
bandwidth_credential,
config.get_gateway_sending_rate(),
);
let received_processor = new_received_processor(
received_processor_receiver_channel,
Arc::clone(&encryption_keypair),
);
let summary_producer = new_summary_producer(config.get_detailed_report());
let packet_receiver = new_packet_receiver(
gateway_status_update_receiver,
received_processor_sender_channel,
);
let monitor = monitor::Monitor::new(
config,
packet_preparer,
packet_sender,
received_processor,
summary_producer,
node_status_storage,
tested_network,
);
NetworkMonitorRunnables {
monitor,
packet_receiver,
}
}
fn new_packet_preparer(
validator_cache: ValidatorCache,
tested_network: TestedNetwork,