feat: make sure any terminated task kills the watcher and write run info to db (#5517)
* feat: make sure any terminated task kills the watcher and write run info to db * updated chain watcher version
This commit is contained in:
committed by
GitHub
parent
a69aa23609
commit
1a334b575d
Generated
+1
-1
@@ -7222,7 +7222,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nyx-chain-watcher"
|
||||
version = "0.1.11"
|
||||
version = "0.1.12"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
|
||||
Generated
+12
@@ -0,0 +1,12 @@
|
||||
{
|
||||
"db_name": "SQLite",
|
||||
"query": "\n INSERT INTO watcher_execution(start, end, error_message)\n VALUES (?, ?, ?)\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Right": 3
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "1aa7733ad4bbf3e6b8db909b8646bee247bc021b9534f1d4b0fcad32e2e56218"
|
||||
}
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
[package]
|
||||
name = "nyx-chain-watcher"
|
||||
version = "0.1.11"
|
||||
version = "0.1.12"
|
||||
authors.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
/*
|
||||
* Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
|
||||
* SPDX-License-Identifier: GPL-3.0-only
|
||||
*/
|
||||
|
||||
CREATE TABLE watcher_execution
|
||||
(
|
||||
start TIMESTAMP WITHOUT TIME ZONE NOT NULL,
|
||||
end TIMESTAMP WITHOUT TIME ZONE NOT NULL,
|
||||
error_message TEXT
|
||||
)
|
||||
@@ -2,19 +2,112 @@
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::error::NyxChainWatcherError;
|
||||
use tokio::join;
|
||||
use tracing::{error, info, trace};
|
||||
use anyhow::Context;
|
||||
use std::time::Duration;
|
||||
use time::OffsetDateTime;
|
||||
use tokio::task::{JoinHandle, JoinSet};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info};
|
||||
|
||||
mod args;
|
||||
mod config;
|
||||
|
||||
use crate::chain_scraper::run_chain_scraper;
|
||||
use crate::db::DbPool;
|
||||
use crate::{db, http, payment_listener, price_scraper};
|
||||
pub(crate) use args::Args;
|
||||
use nym_task::signal::wait_for_signal;
|
||||
|
||||
async fn try_insert_watcher_execution_information(
|
||||
db_pool: DbPool,
|
||||
start: OffsetDateTime,
|
||||
end: OffsetDateTime,
|
||||
error_message: Option<String>,
|
||||
) {
|
||||
let _ = sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO watcher_execution(start, end, error_message)
|
||||
VALUES (?, ?, ?)
|
||||
"#,
|
||||
start,
|
||||
end,
|
||||
error_message
|
||||
)
|
||||
.execute(&db_pool)
|
||||
.await
|
||||
.inspect_err(|err| error!("failed to insert run information: {err}"));
|
||||
}
|
||||
|
||||
async fn wait_for_shutdown(
|
||||
db_pool: DbPool,
|
||||
start: OffsetDateTime,
|
||||
main_cancellation_token: CancellationToken,
|
||||
scraper_cancellation_token: CancellationToken,
|
||||
mut tasks: JoinSet<Option<anyhow::Result<()>>>,
|
||||
) {
|
||||
async fn finalize_shutdown(
|
||||
db_pool: DbPool,
|
||||
start: OffsetDateTime,
|
||||
main_cancellation_token: CancellationToken,
|
||||
scraper_cancellation_token: CancellationToken,
|
||||
mut tasks: JoinSet<Option<anyhow::Result<()>>>,
|
||||
error_message: Option<String>,
|
||||
) {
|
||||
// cancel all tasks
|
||||
main_cancellation_token.cancel();
|
||||
scraper_cancellation_token.cancel();
|
||||
|
||||
// stupid nasty and hacky workaround to make sure all relevant tasks have finished before hard aborting them
|
||||
// nasty stupid and hacky workaround
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
tasks.abort_all();
|
||||
|
||||
// insert execution result into the db
|
||||
try_insert_watcher_execution_information(
|
||||
db_pool,
|
||||
start,
|
||||
OffsetDateTime::now_utc(),
|
||||
error_message,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
tokio::select! {
|
||||
// graceful shutdown
|
||||
_ = wait_for_signal() => {
|
||||
info!("received shutdown signal");
|
||||
finalize_shutdown(db_pool, start, main_cancellation_token, scraper_cancellation_token, tasks, None).await;
|
||||
}
|
||||
_ = scraper_cancellation_token.cancelled() => {
|
||||
info!("the scraper has issued cancellation");
|
||||
finalize_shutdown(db_pool, start, main_cancellation_token, scraper_cancellation_token, tasks, Some("unexpected scraper task cancellation".into())).await;
|
||||
}
|
||||
_ = main_cancellation_token.cancelled() => {
|
||||
info!("one of the tasks has cancelled the token");
|
||||
finalize_shutdown(db_pool, start, main_cancellation_token, scraper_cancellation_token, tasks, Some("unexpected main task cancellation".into())).await;
|
||||
}
|
||||
task_result = tasks.join_next() => {
|
||||
// the first unwrap is fine => join set was not empty
|
||||
let error_message = match task_result.unwrap() {
|
||||
Err(_join_err) => Some("unexpected join error".to_string()),
|
||||
Ok(Some(Ok(_))) => None,
|
||||
Ok(Some(Err(err))) => Some(err.to_string()),
|
||||
Ok(None) => {
|
||||
Some("unexpected task cancellation".to_string())
|
||||
}
|
||||
};
|
||||
|
||||
error!("unexpected task termination: {error_message:?}");
|
||||
finalize_shutdown(db_pool, start, main_cancellation_token, scraper_cancellation_token, tasks, error_message).await;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWatcherError> {
|
||||
trace!("passed arguments: {args:#?}");
|
||||
let start = OffsetDateTime::now_utc();
|
||||
|
||||
info!("passed arguments: {args:#?}");
|
||||
|
||||
let config = config::get_run_config(args)?;
|
||||
|
||||
@@ -29,9 +122,7 @@ pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWa
|
||||
);
|
||||
info!(
|
||||
"Chain History Database path is {:?}",
|
||||
std::path::Path::new(&config.chain_scraper_database_path())
|
||||
.canonicalize()
|
||||
.unwrap_or_default()
|
||||
std::path::Path::new(&config.chain_scraper_database_path()).canonicalize()
|
||||
);
|
||||
|
||||
// Ensure parent directory exists
|
||||
@@ -41,50 +132,99 @@ pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWa
|
||||
|
||||
let connection_url = format!("sqlite://{}?mode=rwc", db_path);
|
||||
let storage = db::Storage::init(connection_url).await?;
|
||||
let watcher_pool = storage.pool_owned().await;
|
||||
let watcher_pool = storage.pool_owned();
|
||||
|
||||
// Spawn the chain scraper and get its storage
|
||||
let mut tasks = JoinSet::new();
|
||||
let cancellation_token = CancellationToken::new();
|
||||
|
||||
// Spawn the payment listener task
|
||||
let payment_listener_handle = tokio::spawn({
|
||||
let price_scraper_pool = storage.pool_owned().await;
|
||||
let scraper_pool = storage.pool_owned().await;
|
||||
run_chain_scraper(&config, scraper_pool).await?;
|
||||
let payment_watcher_config = config.payment_watcher_config.unwrap_or_default();
|
||||
let price_scraper_pool = storage.pool_owned();
|
||||
let scraper_pool = storage.pool_owned();
|
||||
let shutdown_pool = storage.pool_owned();
|
||||
|
||||
// spawn all the tasks
|
||||
|
||||
// 1. chain scraper (note: this doesn't really spawn the full scraper on this task, but we don't want to be blocking waiting for its startup)
|
||||
let scraper_token_handle: JoinHandle<anyhow::Result<CancellationToken>> = tokio::spawn({
|
||||
let config = config.clone();
|
||||
async move {
|
||||
if let Err(e) =
|
||||
payment_listener::run_payment_listener(payment_watcher_config, price_scraper_pool)
|
||||
.await
|
||||
{
|
||||
error!("Payment listener error: {}", e);
|
||||
}
|
||||
Ok::<_, anyhow::Error>(())
|
||||
// this only blocks until startup sync is done; it then runs on its own set of tasks
|
||||
let scraper = run_chain_scraper(&config, scraper_pool).await?;
|
||||
Ok(scraper.cancel_token())
|
||||
}
|
||||
});
|
||||
|
||||
// Clone pool for each task that needs it
|
||||
//let background_pool = db_pool.clone();
|
||||
// 2. payment listener
|
||||
let token = cancellation_token.clone();
|
||||
{
|
||||
tasks.spawn(async move {
|
||||
token
|
||||
.run_until_cancelled(async move {
|
||||
let payment_watcher_config = config.payment_watcher_config.unwrap_or_default();
|
||||
payment_listener::run_payment_listener(
|
||||
payment_watcher_config,
|
||||
price_scraper_pool,
|
||||
)
|
||||
.await
|
||||
.inspect_err(|err| error!("Payment listener error: {err}"))
|
||||
})
|
||||
.await
|
||||
});
|
||||
}
|
||||
|
||||
let price_scraper_handle = tokio::spawn(async move {
|
||||
price_scraper::run_price_scraper(&watcher_pool).await;
|
||||
});
|
||||
// 3. price scraper (note, this task never terminates on its own)
|
||||
{
|
||||
let token = cancellation_token.clone();
|
||||
tasks.spawn(async move {
|
||||
token
|
||||
.run_until_cancelled(async move {
|
||||
price_scraper::run_price_scraper(&watcher_pool).await;
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
});
|
||||
}
|
||||
|
||||
let shutdown_handles = http::server::start_http_api(storage.pool_owned().await, http_port)
|
||||
.await
|
||||
.expect("Failed to start server");
|
||||
// 4. http api
|
||||
let http_server = http::server::build_http_api(storage.pool_owned(), http_port).await?;
|
||||
{
|
||||
let token = cancellation_token.clone();
|
||||
tasks.spawn(async move {
|
||||
info!("Starting HTTP server on port {http_port}",);
|
||||
async move {
|
||||
Some(
|
||||
http_server
|
||||
.run(token.cancelled_owned())
|
||||
.await
|
||||
.context("http server failure"),
|
||||
)
|
||||
}
|
||||
.await
|
||||
});
|
||||
}
|
||||
|
||||
info!("Started HTTP server on port {}", http_port);
|
||||
// 1. wait for either shutdown or scraper having finished startup
|
||||
tokio::select! {
|
||||
_ = wait_for_signal() => {
|
||||
info!("received shutdown signal while waiting for scraper to finish its startup");
|
||||
return Ok(())
|
||||
}
|
||||
scraper_token = scraper_token_handle => {
|
||||
let scraper_token = match scraper_token {
|
||||
Ok(Ok(token)) => token,
|
||||
Ok(Err(startup_err)) => {
|
||||
error!("failed to startup the chain scraper: {startup_err}");
|
||||
return Err(startup_err.into());
|
||||
}
|
||||
Err(runtime_err) => {
|
||||
error!("failed to finish the scraper startup task: {runtime_err}");
|
||||
return Ok(())
|
||||
|
||||
// Wait for the short-lived tasks to complete
|
||||
let _ = join!(price_scraper_handle, payment_listener_handle);
|
||||
}
|
||||
};
|
||||
|
||||
// Wait for a signal to terminate the long-running task
|
||||
wait_for_signal().await;
|
||||
|
||||
if let Err(err) = shutdown_handles.shutdown().await {
|
||||
error!("{err}");
|
||||
};
|
||||
wait_for_shutdown(shutdown_pool, start, cancellation_token, scraper_token, tasks).await
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -34,7 +34,7 @@ impl Storage {
|
||||
}
|
||||
|
||||
/// Cloning pool is cheap, it's the same underlying set of connections
|
||||
pub async fn pool_owned(&self) -> DbPool {
|
||||
pub fn pool_owned(&self) -> DbPool {
|
||||
self.pool.clone()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,19 +1,14 @@
|
||||
use axum::Router;
|
||||
use core::net::SocketAddr;
|
||||
use tokio::{net::TcpListener, task::JoinHandle};
|
||||
use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio_util::sync::WaitForCancellationFutureOwned;
|
||||
|
||||
use crate::{
|
||||
db::DbPool,
|
||||
http::{api::RouterBuilder, state::AppState},
|
||||
};
|
||||
|
||||
/// Return handles that allow for graceful shutdown of server + awaiting its
|
||||
/// background tokio task
|
||||
pub(crate) async fn start_http_api(
|
||||
db_pool: DbPool,
|
||||
http_port: u16,
|
||||
) -> anyhow::Result<ShutdownHandles> {
|
||||
pub(crate) async fn build_http_api(db_pool: DbPool, http_port: u16) -> anyhow::Result<HttpServer> {
|
||||
let router_builder = RouterBuilder::with_default_routes();
|
||||
|
||||
let state = AppState::new(db_pool);
|
||||
@@ -21,50 +16,7 @@ pub(crate) async fn start_http_api(
|
||||
|
||||
let bind_addr = format!("0.0.0.0:{}", http_port);
|
||||
let server = router.build_server(bind_addr).await?;
|
||||
|
||||
Ok(start_server(server))
|
||||
}
|
||||
|
||||
fn start_server(server: HttpServer) -> ShutdownHandles {
|
||||
// one copy is stored to trigger a graceful shutdown later
|
||||
let shutdown_button = CancellationToken::new();
|
||||
// other copy is given to server to listen for a shutdown
|
||||
let shutdown_receiver = shutdown_button.clone();
|
||||
let shutdown_receiver = shutdown_receiver.cancelled_owned();
|
||||
|
||||
let server_handle = tokio::spawn(async move { server.run(shutdown_receiver).await });
|
||||
|
||||
ShutdownHandles {
|
||||
server_handle,
|
||||
shutdown_button,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct ShutdownHandles {
|
||||
server_handle: JoinHandle<std::io::Result<()>>,
|
||||
shutdown_button: CancellationToken,
|
||||
}
|
||||
|
||||
impl ShutdownHandles {
|
||||
/// Send graceful shutdown signal to server and wait for server task to complete
|
||||
pub(crate) async fn shutdown(self) -> anyhow::Result<()> {
|
||||
self.shutdown_button.cancel();
|
||||
|
||||
match self.server_handle.await {
|
||||
Ok(Ok(_)) => {
|
||||
tracing::info!("HTTP server shut down without errors");
|
||||
}
|
||||
Ok(Err(err)) => {
|
||||
tracing::error!("HTTP server terminated with: {err}");
|
||||
anyhow::bail!(err)
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!("Server task panicked: {err}");
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Ok(server)
|
||||
}
|
||||
|
||||
pub(crate) struct HttpServer {
|
||||
|
||||
@@ -3,7 +3,6 @@ use crate::db::{
|
||||
queries::price::insert_nym_prices,
|
||||
};
|
||||
use core::str;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::Duration;
|
||||
|
||||
use crate::db::DbPool;
|
||||
@@ -13,7 +12,7 @@ const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60 * 2);
|
||||
const COINGECKO_API_URL: &str =
|
||||
"https://api.coingecko.com/api/v3/simple/price?ids=nym&vs_currencies=chf,usd,eur,gbp,btc";
|
||||
|
||||
pub(crate) async fn run_price_scraper(db_pool: &DbPool) -> JoinHandle<()> {
|
||||
pub(crate) async fn run_price_scraper(db_pool: &DbPool) {
|
||||
loop {
|
||||
tracing::info!("Running in a loop 🏃");
|
||||
if let Err(e) = get_coingecko_prices(db_pool).await {
|
||||
|
||||
Reference in New Issue
Block a user