feature: CancellationToken-based shutdowns (#5325)

* initial stub for ShutdownToken

* attempting to start using new ShutdownManager in NymNode

* migrated verloc tasks

* added custom shutdown signal registration

* integrated legacy task support

* migrated additional tasks inside nym-node

* removed import thats unused in wasm

* apply review comments

* windows fixes
This commit is contained in:
Jędrzej Stuczyński
2025-01-13 09:13:13 +00:00
committed by GitHub
parent 11d6ee2fdb
commit 102cd1033c
18 changed files with 515 additions and 191 deletions
Generated
+3
View File
@@ -6659,10 +6659,13 @@ dependencies = [
name = "nym-task"
version = "0.1.0"
dependencies = [
"cfg-if",
"futures",
"log",
"thiserror",
"tokio",
"tokio-util",
"tracing",
"wasm-bindgen",
"wasm-bindgen-futures",
"wasmtimer",
+3
View File
@@ -8,10 +8,13 @@ license.workspace = true
repository.workspace = true
[dependencies]
cfg-if = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["macros", "sync"] }
tokio-util = { workspace = true, features = ["rt"] }
tracing = { workspace = true }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio]
workspace = true
+366
View File
@@ -0,0 +1,366 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::{TaskClient, TaskManager};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::future::Future;
use std::ops::Deref;
use std::pin::Pin;
use std::time::Duration;
use tokio::task::JoinSet;
use tokio::time::sleep;
use tokio_util::sync::{CancellationToken, DropGuard};
use tokio_util::task::TaskTracker;
use tracing::{debug, info, trace};
#[cfg(unix)]
use tokio::signal::unix::{signal, SignalKind};
pub const DEFAULT_MAX_SHUTDOWN_DURATION: Duration = Duration::from_secs(5);
pub fn token_name(name: &Option<String>) -> String {
name.clone().unwrap_or_else(|| "unknown".to_string())
}
// a wrapper around tokio's CancellationToken that adds optional `name` information to more easily
// track down sources of shutdown
#[derive(Debug, Default)]
pub struct ShutdownToken {
name: Option<String>,
inner: CancellationToken,
}
impl Clone for ShutdownToken {
fn clone(&self) -> Self {
// make sure to not accidentally overflow the stack if we keep cloning the handle
let name = if let Some(name) = &self.name {
if name != Self::OVERFLOW_NAME && name.len() < Self::MAX_NAME_LENGTH {
Some(format!("{name}-child"))
} else {
Some(Self::OVERFLOW_NAME.to_string())
}
} else {
None
};
ShutdownToken {
name,
inner: self.inner.clone(),
}
}
}
impl Deref for ShutdownToken {
type Target = CancellationToken;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl ShutdownToken {
const MAX_NAME_LENGTH: usize = 128;
const OVERFLOW_NAME: &'static str = "reached maximum ShutdownToken children name depth";
pub fn new(name: impl Into<String>) -> Self {
ShutdownToken {
name: Some(name.into()),
inner: CancellationToken::new(),
}
}
// Creates a ShutdownToken which will get cancelled whenever the current token gets cancelled.
// Unlike a cloned/forked ShutdownToken, cancelling a child token does not cancel the parent token.
#[must_use]
pub fn child_token<S: Into<String>>(&self, child_suffix: S) -> Self {
let suffix = child_suffix.into();
let child_name = if let Some(base) = &self.name {
format!("{base}-{suffix}")
} else {
format!("unknown-{suffix}")
};
ShutdownToken {
name: Some(child_name),
inner: self.inner.child_token(),
}
}
// Creates a clone of the ShutdownToken which will get cancelled whenever the current token gets cancelled, and vice versa.
#[must_use]
pub fn clone_with_suffix<S: Into<String>>(&self, child_suffix: S) -> Self {
let mut child = self.clone();
let suffix = child_suffix.into();
let child_name = if let Some(base) = &self.name {
format!("{base}-{suffix}")
} else {
format!("unknown-{suffix}")
};
child.name = Some(child_name);
child
}
// exposed method with the old name for easier migration
// it will eventually be removed so please try to use `.clone_with_suffix` instead
#[must_use]
pub fn fork<S: Into<String>>(&self, child_suffix: S) -> Self {
self.clone_with_suffix(child_suffix)
}
// exposed method with the old name for easier migration
// it will eventually be removed so please try to use `.clone().named(name)` instead
#[must_use]
pub fn fork_named<S: Into<String>>(&self, name: S) -> Self {
self.clone().named(name)
}
#[must_use]
pub fn named<S: Into<String>>(mut self, name: S) -> Self {
self.name = Some(name.into());
self
}
#[must_use]
pub fn add_suffix<S: Into<String>>(self, suffix: S) -> Self {
let suffix = suffix.into();
let name = if let Some(base) = &self.name {
format!("{base}-{suffix}")
} else {
format!("unknown-{suffix}")
};
self.named(name)
}
// Returned guard will cancel this token (and all its children) on drop unless disarmed.
pub fn drop_guard(self) -> ShutdownDropGuard {
ShutdownDropGuard {
name: self.name,
inner: self.inner.drop_guard(),
}
}
pub fn name(&self) -> String {
token_name(&self.name)
}
pub async fn run_until_cancelled<F>(&self, fut: F) -> Option<F::Output>
where
F: Future,
{
let res = self.inner.run_until_cancelled(fut).await;
trace!("'{}' got cancelled", self.name());
res
}
}
pub struct ShutdownDropGuard {
name: Option<String>,
inner: DropGuard,
}
impl Deref for ShutdownDropGuard {
type Target = DropGuard;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl ShutdownDropGuard {
pub fn disarm(self) -> ShutdownToken {
ShutdownToken {
name: self.name,
inner: self.inner.disarm(),
}
}
pub fn name(&self) -> String {
token_name(&self.name)
}
}
pub struct ShutdownManager {
pub root_token: ShutdownToken,
legacy_task_manager: Option<TaskManager>,
shutdown_signals: JoinSet<()>,
// the reason I'm not using a `JoinSet` is because it forces us to use futures with the same `::Output` type
tracker: TaskTracker,
max_shutdown_duration: Duration,
}
impl Deref for ShutdownManager {
type Target = TaskTracker;
fn deref(&self) -> &Self::Target {
&self.tracker
}
}
impl ShutdownManager {
pub fn new(root_token_name: impl Into<String>) -> Self {
let manager = ShutdownManager {
root_token: ShutdownToken::new(root_token_name),
legacy_task_manager: None,
shutdown_signals: Default::default(),
tracker: Default::default(),
max_shutdown_duration: Default::default(),
};
// we need to add an explicit watcher for the cancellation token being cancelled
// so that we could cancel all legacy tasks
let cancel_watcher = manager.root_token.clone();
manager.with_shutdown(async move { cancel_watcher.cancelled().await })
}
pub fn with_legacy_task_manager(mut self) -> Self {
let mut legacy_manager =
TaskManager::default().named(format!("{}-legacy", self.root_token.name()));
let mut legacy_error_rx = legacy_manager.task_return_error_rx();
let mut legacy_drop_rx = legacy_manager.task_drop_rx();
self.legacy_task_manager = Some(legacy_manager);
// add a task that listens for legacy task clients being dropped to trigger cancellation
self.with_shutdown(async move {
tokio::select! {
_ = legacy_error_rx.recv() => (),
_ = legacy_drop_rx.recv() => (),
}
info!("received legacy shutdown signal");
})
}
#[cfg(not(target_arch = "wasm32"))]
pub fn with_default_shutdown_signals(self) -> std::io::Result<Self> {
cfg_if::cfg_if! {
if #[cfg(unix)] {
self.with_interrupt_signal()
.with_terminate_signal()?
.with_quit_signal()
} else {
Ok(self.with_interrupt_signal())
}
}
}
#[must_use]
pub fn with_shutdown<F>(mut self, shutdown: F) -> Self
where
F: Future<Output = ()>,
F: Send + 'static,
{
let shutdown_token = self.root_token.clone();
self.shutdown_signals.spawn(async move {
shutdown.await;
info!("sending cancellation after receiving shutdown signal");
shutdown_token.cancel();
});
self
}
#[cfg(unix)]
pub fn with_shutdown_signal(self, signal_kind: SignalKind) -> std::io::Result<Self> {
let mut sig = signal(signal_kind)?;
Ok(self.with_shutdown(async move {
sig.recv().await;
}))
}
#[cfg(not(target_arch = "wasm32"))]
pub fn with_interrupt_signal(self) -> Self {
self.with_shutdown(async move {
let _ = tokio::signal::ctrl_c().await;
})
}
#[cfg(unix)]
pub fn with_terminate_signal(self) -> std::io::Result<Self> {
self.with_shutdown_signal(SignalKind::terminate())
}
#[cfg(unix)]
pub fn with_quit_signal(self) -> std::io::Result<Self> {
self.with_shutdown_signal(SignalKind::quit())
}
#[must_use]
pub fn with_shutdown_duration(mut self, duration: Duration) -> Self {
self.max_shutdown_duration = duration;
self
}
pub fn child_token<S: Into<String>>(&self, child_suffix: S) -> ShutdownToken {
self.root_token.child_token(child_suffix)
}
pub fn clone_token<S: Into<String>>(&self, child_suffix: S) -> ShutdownToken {
self.root_token.clone_with_suffix(child_suffix)
}
#[must_use]
pub fn subscribe_legacy<S: Into<String>>(&self, child_suffix: S) -> TaskClient {
// alternatively we could have set self.legacy_task_manager = Some(TaskManager::default());
// on demand if it wasn't unavailable, but then we'd have to use mutable reference
#[allow(clippy::expect_used)]
self.legacy_task_manager
.as_ref()
.expect("did not enable legacy shutdown support")
.subscribe_named(child_suffix)
}
async fn finish_shutdown(mut self) {
let mut wait_futures = FuturesUnordered::<Pin<Box<dyn Future<Output = ()>>>>::new();
// force shutdown via ctrl-c
wait_futures.push(Box::pin(async move {
#[cfg(not(target_arch = "wasm32"))]
let interrupt_future = tokio::signal::ctrl_c();
#[cfg(target_arch = "wasm32")]
let interrupt_future = futures::future::pending::<()>();
let _ = interrupt_future.await;
info!("received interrupt - forcing shutdown");
}));
// timeout
wait_futures.push(Box::pin(async move {
sleep(self.max_shutdown_duration).await;
info!("timeout reached, forcing shutdown");
}));
// graceful
wait_futures.push(Box::pin(async move {
self.tracker.wait().await;
debug!("migrated tasks successfully shutdown");
if let Some(legacy) = self.legacy_task_manager.as_mut() {
legacy.wait_for_graceful_shutdown().await;
debug!("legacy tasks successfully shutdown");
}
info!("all registered tasks successfully shutdown")
}));
wait_futures.next().await;
}
pub async fn wait_for_shutdown_signal(mut self) {
self.shutdown_signals.join_next().await;
if let Some(legacy_manager) = self.legacy_task_manager.as_mut() {
info!("attempting to shutdown legacy tasks");
let _ = legacy_manager.signal_shutdown();
}
info!("waiting for tasks to finish... (press ctrl-c to force)");
self.finish_shutdown().await;
}
}
+6 -3
View File
@@ -1,6 +1,7 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod cancellation;
pub mod connections;
pub mod event;
pub mod manager;
@@ -8,9 +9,11 @@ pub mod manager;
pub mod signal;
pub mod spawn;
pub use cancellation::{ShutdownDropGuard, ShutdownManager, ShutdownToken};
pub use event::{StatusReceiver, StatusSender, TaskStatus, TaskStatusEvent};
pub use manager::{TaskClient, TaskHandle, TaskManager};
#[cfg(not(target_arch = "wasm32"))]
pub use signal::wait_for_signal_and_error;
pub use spawn::{spawn, spawn_with_report_error};
pub use tokio_util::task::TaskTracker;
#[cfg(not(target_arch = "wasm32"))]
pub use signal::{wait_for_signal, wait_for_signal_and_error};
+20
View File
@@ -185,6 +185,19 @@ impl TaskManager {
}
}
// used for compatibility with the ShutdownManager
pub(crate) fn task_return_error_rx(&mut self) -> ErrorReceiver {
self.task_return_error_rx
.take()
.expect("unable to get error channel: attempt to wait twice?")
}
pub(crate) fn task_drop_rx(&mut self) -> ErrorReceiver {
self.task_drop_rx
.take()
.expect("unable to get task drop channel: attempt to wait twice?")
}
pub async fn wait_for_error(&mut self) -> Option<SentError> {
let mut error_rx = self
.task_return_error_rx
@@ -208,6 +221,13 @@ impl TaskManager {
}
}
pub(crate) async fn wait_for_graceful_shutdown(&mut self) {
if let Some(notify_rx) = self.notify_rx.take() {
drop(notify_rx);
}
self.notify_tx.closed().await
}
pub async fn wait_for_shutdown(&mut self) {
log::debug!("Waiting for shutdown");
if let Some(notify_rx) = self.notify_rx.take() {
+28 -53
View File
@@ -6,7 +6,7 @@ use crate::measurements::packet::{EchoPacket, ReplyPacket};
use bytes::{BufMut, BytesMut};
use futures::StreamExt;
use nym_crypto::asymmetric::identity;
use nym_task::TaskClient;
use nym_task::ShutdownToken;
use std::net::SocketAddr;
use std::sync::Arc;
use std::{io, process};
@@ -19,19 +19,19 @@ use tracing::{debug, error, info, trace, warn};
pub struct PacketListener {
address: SocketAddr,
connection_handler: Arc<ConnectionHandler>,
shutdown: TaskClient,
shutdown_token: ShutdownToken,
}
impl PacketListener {
pub fn new(
address: SocketAddr,
identity: Arc<identity::KeyPair>,
shutdown: TaskClient,
shutdown_token: ShutdownToken,
) -> Self {
PacketListener {
address,
connection_handler: Arc::new(ConnectionHandler { identity }),
shutdown,
shutdown_token,
}
}
}
@@ -51,26 +51,22 @@ impl PacketListener {
info!("Started listening for echo packets on {}", self.address);
let mut shutdown_listener = self.shutdown.clone();
while !shutdown_listener.is_shutdown() {
while !self.shutdown_token.is_cancelled() {
// cloning the arc as each accepted socket is handled in separate task
let connection_handler = Arc::clone(&self.connection_handler);
let mut handler_shutdown_listener = self.shutdown.clone();
handler_shutdown_listener.disarm();
tokio::select! {
socket = listener.accept() => {
match socket {
Ok((socket, remote_addr)) => {
debug!("New verloc connection from {}", remote_addr);
tokio::spawn(connection_handler.handle_connection(socket, remote_addr, handler_shutdown_listener));
debug!("New verloc connection from {remote_addr}");
let cancel = self.shutdown_token.child_token(format!("handler_{remote_addr}"));
tokio::spawn(async move { cancel.run_until_cancelled(connection_handler.handle_connection(socket, remote_addr)).await });
}
Err(err) => warn!("Failed to accept incoming connection - {err}"),
}
},
_ = shutdown_listener.recv() => {
_ = self.shutdown_token.cancelled() => {
trace!("PacketListener: Received shutdown");
}
}
@@ -88,50 +84,29 @@ impl ConnectionHandler {
packet.construct_reply(self.identity.private_key())
}
pub(crate) async fn handle_connection(
self: Arc<Self>,
conn: TcpStream,
remote: SocketAddr,
mut shutdown_listener: TaskClient,
) {
debug!("Starting connection handler for {:?}", remote);
pub(crate) async fn handle_connection(self: Arc<Self>, conn: TcpStream, remote: SocketAddr) {
debug!("Starting connection handler for {remote}");
let mut framed_conn = Framed::new(conn, EchoPacketCodec);
while !shutdown_listener.is_shutdown() {
tokio::select! {
biased;
_ = shutdown_listener.recv() => {
trace!("ConnectionHandler: Shutdown received");
while let Some(echo_packet) = framed_conn.next().await {
let reply_packet = match echo_packet {
Ok(echo_packet) => self.handle_echo_packet(echo_packet),
Err(err) => {
debug!(
"The socket connection got corrupted with error: {err}. Closing the socket"
);
return;
}
maybe_echo_packet = framed_conn.next() => {
// handle echo packet
let reply_packet = match maybe_echo_packet {
Some(Ok(echo_packet)) => self.handle_echo_packet(echo_packet),
Some(Err(err)) => {
debug!(
"The socket connection got corrupted with error: {err}. Closing the socket",
);
return;
}
None => {
debug!("The socket connection got terminated by the remote!");
return;
}
};
};
// write back the reply (note the lack of framing)
if let Err(err) = framed_conn
.get_mut()
.write_all(reply_packet.to_bytes().as_ref())
.await
{
debug!(
"Failed to write reply packet back to the sender - {}. Closing the socket on our end",
err
);
return;
}
},
// write back the reply (note the lack of framing)
if let Err(err) = framed_conn
.get_mut()
.write_all(reply_packet.to_bytes().as_ref())
.await
{
debug!("Failed to write reply packet back to the sender: {err}. Closing the socket on our end");
return;
}
}
}
+10 -13
View File
@@ -8,7 +8,7 @@ use crate::models::VerlocNodeResult;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use nym_crypto::asymmetric::identity;
use nym_task::TaskClient;
use nym_task::ShutdownToken;
use nym_validator_client::models::NymNodeDescription;
use nym_validator_client::NymApiClient;
use rand::prelude::SliceRandom;
@@ -23,7 +23,7 @@ pub struct VerlocMeasurer {
config: Config,
packet_sender: Arc<PacketSender>,
packet_listener: Arc<PacketListener>,
shutdown_listener: TaskClient,
shutdown_token: ShutdownToken,
state: SharedVerlocStats,
}
@@ -31,7 +31,7 @@ impl VerlocMeasurer {
pub fn new(
config: Config,
identity: Arc<identity::KeyPair>,
shutdown_listener: TaskClient,
shutdown_token: ShutdownToken,
) -> Self {
VerlocMeasurer {
packet_sender: Arc::new(PacketSender::new(
@@ -40,14 +40,14 @@ impl VerlocMeasurer {
config.packet_timeout,
config.connection_timeout,
config.delay_between_packets,
shutdown_listener.clone().named("VerlocPacketSender"),
shutdown_token.clone_with_suffix("packet_sender"),
)),
packet_listener: Arc::new(PacketListener::new(
config.listening_address,
Arc::clone(&identity),
shutdown_listener.clone().named("VerlocPacketListener"),
shutdown_token.clone_with_suffix("packet_listener"),
)),
shutdown_listener,
shutdown_token,
config,
state: SharedVerlocStats::default(),
}
@@ -69,9 +69,6 @@ impl VerlocMeasurer {
return MeasurementOutcome::Done;
}
let mut shutdown_listener = self.shutdown_listener.clone().named("VerlocMeasurement");
shutdown_listener.disarm();
for chunk in nodes_to_test.chunks(self.config.tested_nodes_batch_size) {
let mut chunk_results = Vec::with_capacity(chunk.len());
@@ -95,7 +92,7 @@ impl VerlocMeasurer {
.collect::<FuturesUnordered<_>>();
// exhaust the results
while !shutdown_listener.is_shutdown() {
while !self.shutdown_token.is_cancelled() {
tokio::select! {
measurement_result = measurement_chunk.next() => {
let Some(result) = measurement_result else {
@@ -120,7 +117,7 @@ impl VerlocMeasurer {
};
chunk_results.push(VerlocNodeResult::new(identity, measurement_result));
},
_ = shutdown_listener.recv() => {
_ = self.shutdown_token.cancelled() => {
trace!("Shutdown received while measuring");
return MeasurementOutcome::Shutdown;
}
@@ -155,7 +152,7 @@ impl VerlocMeasurer {
pub async fn run(&mut self) {
self.start_listening();
while !self.shutdown_listener.is_shutdown() {
while !self.shutdown_token.is_cancelled() {
info!("Starting verloc measurements");
// TODO: should we also measure gateways?
@@ -209,7 +206,7 @@ impl VerlocMeasurer {
tokio::select! {
_ = sleep(self.config.testing_interval) => {},
_ = self.shutdown_listener.recv() => {
_ = self.shutdown_token.cancelled() => {
trace!("Shutdown received while sleeping");
}
}
+6 -9
View File
@@ -5,7 +5,7 @@ use crate::error::VerlocError;
use crate::measurements::packet::{EchoPacket, ReplyPacket};
use crate::models::VerlocMeasurement;
use nym_crypto::asymmetric::ed25519;
use nym_task::TaskClient;
use nym_task::ShutdownToken;
use rand::{thread_rng, Rng};
use std::net::SocketAddr;
use std::sync::Arc;
@@ -45,7 +45,7 @@ pub struct PacketSender {
packet_timeout: Duration,
connection_timeout: Duration,
delay_between_packets: Duration,
shutdown_listener: TaskClient,
shutdown_token: ShutdownToken,
}
impl PacketSender {
@@ -55,7 +55,7 @@ impl PacketSender {
packet_timeout: Duration,
connection_timeout: Duration,
delay_between_packets: Duration,
shutdown_listener: TaskClient,
shutdown_token: ShutdownToken,
) -> Self {
PacketSender {
identity,
@@ -63,7 +63,7 @@ impl PacketSender {
packet_timeout,
connection_timeout,
delay_between_packets,
shutdown_listener,
shutdown_token,
}
}
@@ -83,9 +83,6 @@ impl PacketSender {
self: Arc<Self>,
tested_node: TestedNode,
) -> Result<VerlocMeasurement, VerlocError> {
let mut shutdown_listener = self.shutdown_listener.fork(tested_node.address.to_string());
shutdown_listener.disarm();
let mut conn = match tokio::time::timeout(
self.connection_timeout,
TcpStream::connect(tested_node.address),
@@ -148,7 +145,7 @@ impl PacketSender {
Ok(Ok(_)) => {}
}
},
_ = shutdown_listener.recv() => {
_ = self.shutdown_token.cancelled() => {
trace!("PacketSender: Received shutdown while sending");
return Err(VerlocError::ShutdownReceived);
},
@@ -190,7 +187,7 @@ impl PacketSender {
}
}
},
_ = shutdown_listener.recv() => {
_ = self.shutdown_token.cancelled() => {
trace!("PacketSender: Received shutdown while waiting for reply");
return Err(VerlocError::ShutdownReceived);
}
+6
View File
@@ -50,6 +50,12 @@ pub enum NymNodeError {
#[error("this binary version no longer supports migration from legacy mixnodes and gateways")]
UnsupportedMigration,
#[error("failed to initialise shutdown signals: {source}")]
ShutdownSignalFailure {
#[source]
source: io::Error,
},
#[error("could not find an existing config file at '{}' and fresh node initialisation has been disabled", config_path.display())]
ForbiddenInitialisation { config_path: PathBuf },
+1 -46
View File
@@ -6,9 +6,7 @@ use axum::extract::ConnectInfo;
use axum::middleware::AddExtension;
use axum::serve::Serve;
use axum::Router;
use nym_task::TaskClient;
use std::net::SocketAddr;
use tracing::{debug, error};
pub use router::{api, HttpServerConfig, NymNodeRouter};
@@ -19,47 +17,4 @@ pub mod state;
type InnerService = IntoMakeServiceWithConnectInfo<Router, SocketAddr>;
type ConnectInfoExt = AddExtension<Router, ConnectInfo<SocketAddr>>;
pub type ServeService = Serve<InnerService, ConnectInfoExt>;
pub struct NymNodeHttpServer {
task_client: Option<TaskClient>,
inner: ServeService,
}
impl NymNodeHttpServer {
pub(crate) fn new(inner: ServeService) -> Self {
NymNodeHttpServer {
task_client: None,
inner,
}
}
#[must_use]
pub fn with_task_client(mut self, task_client: TaskClient) -> Self {
self.task_client = Some(task_client);
self
}
async fn run_server_forever(server: ServeService) {
if let Err(err) = server.await {
error!("the HTTP server has terminated with the error: {err}");
} else {
error!("the HTTP server has terminated with producing any errors");
}
}
pub async fn run(self) {
if let Some(mut task_client) = self.task_client {
tokio::select! {
_ = task_client.recv_with_delay() => {
debug!("NymNodeHTTPServer: Received shutdown");
}
_ = Self::run_server_forever(self.inner) => { }
}
} else {
Self::run_server_forever(self.inner).await
}
debug!("NymNodeHTTPServer: Exiting");
}
}
pub type NymNodeHttpServer = Serve<InnerService, ConnectInfoExt>;
+2 -4
View File
@@ -175,12 +175,10 @@ impl NymNodeRouter {
source,
})?;
let axum_server = axum::serve(
Ok(axum::serve(
listener,
self.inner
.into_make_service_with_connect_info::<SocketAddr>(),
);
Ok(NymNodeHttpServer::new(axum_server))
))
}
}
+4 -3
View File
@@ -6,6 +6,7 @@ use futures::StreamExt;
use nym_node_metrics::events::{
events_channels, MetricEventsReceiver, MetricEventsSender, MetricsEvent,
};
use nym_task::ShutdownToken;
use std::any;
use std::any::TypeId;
use std::collections::HashMap;
@@ -24,11 +25,11 @@ pub(crate) struct MetricsAggregator {
// registered_handlers: HashMap<TypeId, Box<dyn Any + Send + Sync + 'static>>,
event_sender: MetricEventsSender,
event_receiver: MetricEventsReceiver,
shutdown: nym_task::TaskClient,
shutdown: ShutdownToken,
}
impl MetricsAggregator {
pub fn new(handlers_update_interval: Duration, shutdown: nym_task::TaskClient) -> Self {
pub fn new(handlers_update_interval: Duration, shutdown: ShutdownToken) -> Self {
let (event_sender, event_receiver) = events_channels();
MetricsAggregator {
@@ -113,7 +114,7 @@ impl MetricsAggregator {
loop {
tokio::select! {
biased;
_ = self.shutdown.recv() => {
_ = self.shutdown.cancelled() => {
debug!("MetricsAggregator: Received shutdown");
break;
}
+6 -5
View File
@@ -4,7 +4,7 @@
use human_repr::HumanCount;
use human_repr::HumanThroughput;
use nym_node_metrics::NymNodeMetrics;
use nym_task::TaskClient;
use nym_task::ShutdownToken;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::task::JoinHandle;
@@ -49,14 +49,14 @@ pub(crate) struct ConsoleLogger {
logging_delay: Duration,
at_last_update: AtLastUpdate,
metrics: NymNodeMetrics,
shutdown: TaskClient,
shutdown: ShutdownToken,
}
impl ConsoleLogger {
pub(crate) fn new(
logging_delay: Duration,
metrics: NymNodeMetrics,
shutdown: TaskClient,
shutdown: ShutdownToken,
) -> Self {
ConsoleLogger {
logging_delay,
@@ -126,11 +126,12 @@ impl ConsoleLogger {
async fn run(&mut self) {
trace!("Starting ConsoleLogger");
let mut interval = interval_at(Instant::now() + self.logging_delay, self.logging_delay);
while !self.shutdown.is_shutdown() {
loop {
tokio::select! {
biased;
_ = self.shutdown.recv() => {
_ = self.shutdown.cancelled() => {
trace!("ConsoleLogger: Received shutdown");
break
}
_ = interval.tick() => self.log_running_stats().await,
};
+6 -7
View File
@@ -38,10 +38,7 @@ impl ConnectionHandler {
tcp_stream: TcpStream,
remote_address: SocketAddr,
) -> Self {
let mut task_client = shared.task_client.fork(remote_address.to_string());
// we don't want dropped connections to cause global shutdown
task_client.disarm();
let shutdown = shared.shutdown.child_token(remote_address.to_string());
shared.metrics.network.new_active_ingress_mixnet_client();
ConnectionHandler {
@@ -51,7 +48,7 @@ impl ConnectionHandler {
mixnet_forwarder: shared.mixnet_forwarder.clone(),
final_hop: shared.final_hop.clone(),
metrics: shared.metrics.clone(),
task_client,
shutdown,
},
remote_address,
mixnet_connection: Framed::new(tcp_stream, NymCodec),
@@ -165,11 +162,12 @@ impl ConnectionHandler {
)
)]
pub(crate) async fn handle_stream(&mut self) {
while !self.shared.task_client.is_shutdown() {
loop {
tokio::select! {
biased;
_ = self.shared.task_client.recv() => {
_ = self.shared.shutdown.cancelled() => {
trace!("connection handler: received shutdown");
break
}
maybe_framed_nym_packet = self.mixnet_connection.next() => {
match maybe_framed_nym_packet {
@@ -186,6 +184,7 @@ impl ConnectionHandler {
}
}
}
debug!("exiting and closing connection");
}
}
+6 -10
View File
@@ -2,14 +2,14 @@
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::mixnet::SharedData;
use nym_task::TaskClient;
use nym_task::ShutdownToken;
use std::net::SocketAddr;
use tokio::task::JoinHandle;
use tracing::{error, info, trace};
pub(crate) struct Listener {
bind_address: SocketAddr,
shutdown: TaskClient,
shutdown: ShutdownToken,
shared_data: SharedData,
}
@@ -17,7 +17,7 @@ impl Listener {
pub(crate) fn new(bind_address: SocketAddr, shared_data: SharedData) -> Self {
Listener {
bind_address,
shutdown: shared_data.task_client.fork("socket-listener"),
shutdown: shared_data.shutdown.clone_with_suffix("socket-listener"),
shared_data,
}
}
@@ -29,19 +29,15 @@ impl Listener {
Ok(listener) => listener,
Err(err) => {
error!("Failed to bind to {}: {err}. Are you sure nothing else is running on the specified port and your user has sufficient permission to bind to the requested address?", self.bind_address);
// that's a bit gnarly, but we need to make sure we trigger shutdown
let mut shutdown_bomb = self.shutdown.fork("shutdown-bomb");
shutdown_bomb.rearm();
drop(shutdown_bomb);
self.shutdown.cancel();
return;
}
};
while !self.shutdown.is_shutdown() {
loop {
tokio::select! {
biased;
_ = self.shutdown.recv() => {
_ = self.shutdown.cancelled() => {
trace!("mixnet listener: received shutdown");
}
connection = tcp_listener.accept() => {
@@ -9,6 +9,7 @@ use nym_mixnet_client::SendWithoutResponse;
use nym_node_metrics::NymNodeMetrics;
use nym_nonexhaustive_delayqueue::{Expired, NonExhaustiveDelayQueue};
use nym_sphinx_forwarding::packet::MixPacket;
use nym_task::ShutdownToken;
use std::io;
use tokio::time::Instant;
use tracing::{debug, error, trace, warn};
@@ -21,11 +22,11 @@ pub struct PacketForwarder<C> {
packet_sender: MixForwardingSender,
packet_receiver: MixForwardingReceiver,
shutdown: nym_task::TaskClient,
shutdown: ShutdownToken,
}
impl<C> PacketForwarder<C> {
pub fn new(client: C, metrics: NymNodeMetrics, shutdown: nym_task::TaskClient) -> Self {
pub fn new(client: C, metrics: NymNodeMetrics, shutdown: ShutdownToken) -> Self {
let (packet_sender, packet_receiver) = mix_forwarding_channels();
PacketForwarder {
@@ -123,7 +124,7 @@ impl<C> PacketForwarder<C> {
loop {
tokio::select! {
biased;
_ = self.shutdown.recv() => {
_ = self.shutdown.cancelled() => {
debug!("PacketForwarder: Received shutdown");
break;
}
+6 -10
View File
@@ -11,7 +11,7 @@ use nym_node_metrics::NymNodeMetrics;
use nym_sphinx_forwarding::packet::MixPacket;
use nym_sphinx_framing::processing::{MixProcessingResult, PacketProcessingError};
use nym_sphinx_types::DestinationAddressBytes;
use nym_task::TaskClient;
use nym_task::ShutdownToken;
use std::io;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
@@ -43,7 +43,6 @@ impl ProcessingConfig {
}
// explicitly do NOT derive clone as we want to manually apply relevant suffixes to the task clients
// as well as immediately disarm them
pub(crate) struct SharedData {
pub(super) processing_config: ProcessingConfig,
// TODO: this type is not `Zeroize` : (
@@ -56,7 +55,7 @@ pub(crate) struct SharedData {
pub(super) final_hop: SharedFinalHopData,
pub(super) metrics: NymNodeMetrics,
pub(super) task_client: TaskClient,
pub(super) shutdown: ShutdownToken,
}
impl SharedData {
@@ -66,7 +65,7 @@ impl SharedData {
mixnet_forwarder: MixForwardingSender,
final_hop: SharedFinalHopData,
metrics: NymNodeMetrics,
task_client: TaskClient,
shutdown: ShutdownToken,
) -> Self {
SharedData {
processing_config,
@@ -74,7 +73,7 @@ impl SharedData {
mixnet_forwarder,
final_hop,
metrics,
task_client,
shutdown,
}
}
@@ -144,13 +143,10 @@ impl SharedData {
.mixnet_forwarder
.forward_packet(PacketToForward::new(packet, delay_until))
.is_err()
&& !self.task_client.is_shutdown()
&& !self.shutdown.is_cancelled()
{
error!("failed to forward sphinx packet on the channel while the process is not going through the shutdown!");
// this is a critical error, we're in uncharted lands, we have to shut down
let mut shutdown_bomb = self.task_client.fork("shutdown bomb");
shutdown_bomb.rearm();
drop(shutdown_bomb)
self.shutdown.cancel();
}
}
+32 -25
View File
@@ -43,7 +43,7 @@ use nym_node_metrics::NymNodeMetrics;
use nym_node_requests::api::v1::node::models::{AnnouncePorts, NodeDescription};
use nym_sphinx_acknowledgements::AckKey;
use nym_sphinx_addressing::Recipient;
use nym_task::{TaskClient, TaskManager};
use nym_task::{ShutdownManager, ShutdownToken, TaskClient};
use nym_validator_client::client::NymApiClientExt;
use nym_validator_client::models::NodeRefreshBody;
use nym_validator_client::{NymApiClient, UserAgent};
@@ -351,6 +351,7 @@ impl From<WireguardData> for nym_wireguard::WireguardData {
pub(crate) struct NymNode {
config: Config,
accepted_operator_terms_and_conditions: bool,
shutdown_manager: ShutdownManager,
description: NodeDescription,
@@ -447,6 +448,10 @@ impl NymNode {
wireguard: Some(wireguard_data),
config,
accepted_operator_terms_and_conditions: false,
shutdown_manager: ShutdownManager::new("NymNode")
.with_legacy_task_manager()
.with_default_shutdown_signals()
.map_err(|source| NymNodeError::ShutdownSignalFailure { source })?,
})
}
@@ -823,7 +828,7 @@ impl NymNode {
}
}
pub(crate) fn start_verloc_measurements(&self, shutdown: TaskClient) {
pub(crate) fn start_verloc_measurements(&self) {
info!(
"Starting the [verloc] round-trip-time measurer on {} ...",
self.config.verloc.bind_address
@@ -845,8 +850,11 @@ impl NymNode {
.retry_timeout(self.config.verloc.debug.retry_timeout)
.build();
let mut verloc_measurer =
VerlocMeasurer::new(config, self.ed25519_identity_keys.clone(), shutdown);
let mut verloc_measurer = VerlocMeasurer::new(
config,
self.ed25519_identity_keys.clone(),
self.shutdown_manager.clone_token("verloc"),
);
verloc_measurer.set_shared_state(self.verloc_stats.clone());
tokio::spawn(async move { verloc_measurer.run().await });
}
@@ -855,14 +863,14 @@ impl NymNode {
&self,
active_clients_store: ActiveClientsStore,
active_egress_mixnet_connections: ActiveConnections,
shutdown: TaskClient,
shutdown: ShutdownToken,
) -> MetricEventsSender {
info!("setting up node metrics...");
// aggregator (to listen for any metrics events)
let mut metrics_aggregator = MetricsAggregator::new(
self.config.metrics.debug.aggregator_update_rate,
shutdown.fork("aggregator"),
shutdown.clone_with_suffix("aggregator"),
);
// >>>> START: register all relevant handlers for custom events
@@ -924,12 +932,9 @@ impl NymNode {
ConsoleLogger::new(
self.config.metrics.debug.console_logging_update_interval,
self.metrics.clone(),
shutdown.named("metrics-console-logger"),
shutdown.clone_with_suffix("metrics-console-logger"),
)
.start();
} else {
let mut shutdown = shutdown;
shutdown.disarm()
}
let events_sender = metrics_aggregator.sender();
@@ -943,7 +948,7 @@ impl NymNode {
pub(crate) fn start_mixnet_listener(
&self,
active_clients_store: &ActiveClientsStore,
shutdown: TaskClient,
shutdown: ShutdownToken,
) -> (MixForwardingSender, ActiveConnections) {
let processing_config = ProcessingConfig::new(&self.config);
@@ -972,7 +977,7 @@ impl NymNode {
let mut packet_forwarder = PacketForwarder::new(
mixnet_client,
self.metrics.clone(),
shutdown.fork("mix-packet-forwarder"),
shutdown.clone_with_suffix("mix-packet-forwarder"),
);
let mix_packet_sender = packet_forwarder.sender();
tokio::spawn(async move { packet_forwarder.run().await });
@@ -1005,45 +1010,47 @@ impl NymNode {
);
debug!("config: {:#?}", self.config);
let mut task_manager = TaskManager::default().named("NymNode");
let http_server = self
.build_http_server()
.await?
.with_task_client(task_manager.subscribe_named("http-server"));
let http_server = self.build_http_server().await?;
let bind_address = self.config.http.bind_address;
tokio::spawn(async move {
let server_shutdown = self.shutdown_manager.clone_token("http-server");
self.shutdown_manager.spawn(async move {
{
info!("started NymNodeHTTPServer on {bind_address}");
http_server.run().await
info!("starting NymNodeHTTPServer on {bind_address}");
http_server
.with_graceful_shutdown(async move { server_shutdown.cancelled().await })
.await
}
});
self.try_refresh_remote_nym_api_cache().await;
self.start_verloc_measurements(task_manager.subscribe_named("verloc-measurements"));
self.start_verloc_measurements();
let active_clients_store = ActiveClientsStore::new();
let (mix_packet_sender, active_egress_mixnet_connections) = self.start_mixnet_listener(
&active_clients_store,
task_manager.subscribe_named("mixnet-traffic"),
self.shutdown_manager.clone_token("mixnet-traffic"),
);
let metrics_sender = self.setup_metrics_backend(
active_clients_store.clone(),
active_egress_mixnet_connections,
task_manager.subscribe_named("metrics"),
self.shutdown_manager.clone_token("metrics"),
);
self.start_gateway_tasks(
metrics_sender,
active_clients_store,
mix_packet_sender,
task_manager.subscribe_named("gateway-tasks"),
self.shutdown_manager.subscribe_legacy("gateway-tasks"),
)
.await?;
let _ = task_manager.catch_interrupt().await;
self.shutdown_manager.close();
self.shutdown_manager.wait_for_shutdown_signal().await;
Ok(())
}
}