Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 291544c311 |
@@ -30,7 +30,7 @@ jobs:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
os: [ arc-ubuntu-20.04, custom-runner-mac-m1 ]
|
||||
os: [ arc-ubuntu-20.04, custom-windows-11, custom-runner-mac-m1 ]
|
||||
runs-on: ${{ matrix.os }}
|
||||
env:
|
||||
CARGO_TERM_COLOR: always
|
||||
|
||||
@@ -183,6 +183,7 @@ impl PacketSender {
|
||||
gateway_packet_router,
|
||||
Some(fresh_gateway_client_data.bandwidth_controller.clone()),
|
||||
nym_statistics_common::clients::ClientStatsSender::new(None),
|
||||
#[cfg(unix)]
|
||||
None,
|
||||
task_client,
|
||||
);
|
||||
|
||||
@@ -1,8 +1,22 @@
|
||||
// Copyright 2024 Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use nym_bin_common::bin_info;
|
||||
use time::OffsetDateTime;
|
||||
use tracing::{debug, info, warn};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::{
|
||||
cli::Cli,
|
||||
deposit_maker::DepositMaker,
|
||||
error::VpnApiError,
|
||||
http::{
|
||||
state::{ApiState, ChainClient},
|
||||
HttpServer,
|
||||
},
|
||||
storage::VpnApiStorage,
|
||||
tasks::StoragePruner,
|
||||
};
|
||||
|
||||
pub struct LockTimer {
|
||||
created: OffsetDateTime,
|
||||
@@ -40,3 +54,88 @@ impl Default for LockTimer {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn wait_for_signal() {
|
||||
use tokio::signal::unix::{signal, SignalKind};
|
||||
|
||||
// if we fail to setup the signals, we should just blow up
|
||||
#[allow(clippy::expect_used)]
|
||||
let mut sigterm = signal(SignalKind::terminate()).expect("Failed to setup SIGTERM channel");
|
||||
#[allow(clippy::expect_used)]
|
||||
let mut sigquit = signal(SignalKind::quit()).expect("Failed to setup SIGQUIT channel");
|
||||
|
||||
tokio::select! {
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
info!("Received SIGINT");
|
||||
},
|
||||
_ = sigterm.recv() => {
|
||||
info!("Received SIGTERM");
|
||||
}
|
||||
_ = sigquit.recv() => {
|
||||
info!("Received SIGQUIT");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn build_sha_short() -> &'static str {
|
||||
let bin_info = bin_info!();
|
||||
if bin_info.commit_sha.len() < 7 {
|
||||
panic!("unavailable build commit sha")
|
||||
}
|
||||
|
||||
if bin_info.commit_sha == "VERGEN_IDEMPOTENT_OUTPUT" {
|
||||
error!("the binary hasn't been built correctly. it doesn't have a commit sha information");
|
||||
return "unknown";
|
||||
}
|
||||
|
||||
&bin_info.commit_sha[..7]
|
||||
}
|
||||
|
||||
pub(crate) async fn run_api(cli: Cli) -> Result<(), VpnApiError> {
|
||||
// create the tasks
|
||||
let bind_address = cli.bind_address();
|
||||
|
||||
let storage = VpnApiStorage::init(cli.persistent_storage_path()).await?;
|
||||
let mnemonic = cli.mnemonic;
|
||||
let auth_token = cli.http_auth_token;
|
||||
let webhook_cfg = cli.webhook;
|
||||
let chain_client = ChainClient::new(mnemonic)?;
|
||||
let cancellation_token = CancellationToken::new();
|
||||
|
||||
let deposit_maker = DepositMaker::new(
|
||||
build_sha_short(),
|
||||
chain_client.clone(),
|
||||
cli.max_concurrent_deposits,
|
||||
cancellation_token.clone(),
|
||||
);
|
||||
|
||||
let deposit_request_sender = deposit_maker.deposit_request_sender();
|
||||
let api_state = ApiState::new(
|
||||
storage.clone(),
|
||||
webhook_cfg,
|
||||
chain_client,
|
||||
deposit_request_sender,
|
||||
cancellation_token.clone(),
|
||||
)
|
||||
.await?;
|
||||
let http_server = HttpServer::new(
|
||||
bind_address,
|
||||
api_state.clone(),
|
||||
auth_token,
|
||||
cancellation_token.clone(),
|
||||
);
|
||||
let storage_pruner = StoragePruner::new(cancellation_token, storage);
|
||||
|
||||
// spawn all the tasks
|
||||
api_state.try_spawn(http_server.run_forever());
|
||||
api_state.try_spawn(storage_pruner.run_forever());
|
||||
api_state.try_spawn(deposit_maker.run_forever());
|
||||
|
||||
// wait for cancel signal (SIGINT, SIGTERM or SIGQUIT)
|
||||
wait_for_signal().await;
|
||||
|
||||
// cancel all the tasks and wait for all task to terminate
|
||||
api_state.cancel_and_wait().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -6,117 +6,30 @@
|
||||
#![warn(clippy::todo)]
|
||||
#![warn(clippy::dbg_macro)]
|
||||
|
||||
use crate::cli::Cli;
|
||||
use crate::deposit_maker::DepositMaker;
|
||||
use crate::error::VpnApiError;
|
||||
use crate::http::state::{ApiState, ChainClient};
|
||||
use crate::http::HttpServer;
|
||||
use crate::storage::VpnApiStorage;
|
||||
use crate::tasks::StoragePruner;
|
||||
use clap::Parser;
|
||||
use nym_bin_common::logging::setup_tracing_logger;
|
||||
use nym_bin_common::{bin_info, bin_info_owned};
|
||||
use nym_network_defaults::setup_env;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, trace};
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(unix)] {
|
||||
use crate::cli::Cli;
|
||||
use clap::Parser;
|
||||
use nym_bin_common::bin_info_owned;
|
||||
use nym_bin_common::logging::setup_tracing_logger;
|
||||
use nym_network_defaults::setup_env;
|
||||
use tracing::{info, trace};
|
||||
|
||||
pub mod cli;
|
||||
pub mod config;
|
||||
pub mod credentials;
|
||||
mod deposit_maker;
|
||||
pub mod error;
|
||||
pub mod helpers;
|
||||
pub mod http;
|
||||
pub mod nym_api_helpers;
|
||||
pub mod storage;
|
||||
pub mod tasks;
|
||||
mod webhook;
|
||||
|
||||
pub async fn wait_for_signal() {
|
||||
use tokio::signal::unix::{signal, SignalKind};
|
||||
|
||||
// if we fail to setup the signals, we should just blow up
|
||||
#[allow(clippy::expect_used)]
|
||||
let mut sigterm = signal(SignalKind::terminate()).expect("Failed to setup SIGTERM channel");
|
||||
#[allow(clippy::expect_used)]
|
||||
let mut sigquit = signal(SignalKind::quit()).expect("Failed to setup SIGQUIT channel");
|
||||
|
||||
tokio::select! {
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
info!("Received SIGINT");
|
||||
},
|
||||
_ = sigterm.recv() => {
|
||||
info!("Received SIGTERM");
|
||||
}
|
||||
_ = sigquit.recv() => {
|
||||
info!("Received SIGQUIT");
|
||||
}
|
||||
pub mod cli;
|
||||
pub mod config;
|
||||
pub mod credentials;
|
||||
mod deposit_maker;
|
||||
pub mod error;
|
||||
pub mod helpers;
|
||||
pub mod http;
|
||||
pub mod nym_api_helpers;
|
||||
pub mod storage;
|
||||
pub mod tasks;
|
||||
mod webhook;
|
||||
}
|
||||
}
|
||||
|
||||
fn build_sha_short() -> &'static str {
|
||||
let bin_info = bin_info!();
|
||||
if bin_info.commit_sha.len() < 7 {
|
||||
panic!("unavailable build commit sha")
|
||||
}
|
||||
|
||||
if bin_info.commit_sha == "VERGEN_IDEMPOTENT_OUTPUT" {
|
||||
error!("the binary hasn't been built correctly. it doesn't have a commit sha information");
|
||||
return "unknown";
|
||||
}
|
||||
|
||||
&bin_info.commit_sha[..7]
|
||||
}
|
||||
|
||||
async fn run_api(cli: Cli) -> Result<(), VpnApiError> {
|
||||
// create the tasks
|
||||
let bind_address = cli.bind_address();
|
||||
|
||||
let storage = VpnApiStorage::init(cli.persistent_storage_path()).await?;
|
||||
let mnemonic = cli.mnemonic;
|
||||
let auth_token = cli.http_auth_token;
|
||||
let webhook_cfg = cli.webhook;
|
||||
let chain_client = ChainClient::new(mnemonic)?;
|
||||
let cancellation_token = CancellationToken::new();
|
||||
|
||||
let deposit_maker = DepositMaker::new(
|
||||
build_sha_short(),
|
||||
chain_client.clone(),
|
||||
cli.max_concurrent_deposits,
|
||||
cancellation_token.clone(),
|
||||
);
|
||||
|
||||
let deposit_request_sender = deposit_maker.deposit_request_sender();
|
||||
let api_state = ApiState::new(
|
||||
storage.clone(),
|
||||
webhook_cfg,
|
||||
chain_client,
|
||||
deposit_request_sender,
|
||||
cancellation_token.clone(),
|
||||
)
|
||||
.await?;
|
||||
let http_server = HttpServer::new(
|
||||
bind_address,
|
||||
api_state.clone(),
|
||||
auth_token,
|
||||
cancellation_token.clone(),
|
||||
);
|
||||
let storage_pruner = StoragePruner::new(cancellation_token, storage);
|
||||
|
||||
// spawn all the tasks
|
||||
api_state.try_spawn(http_server.run_forever());
|
||||
api_state.try_spawn(storage_pruner.run_forever());
|
||||
api_state.try_spawn(deposit_maker.run_forever());
|
||||
|
||||
// wait for cancel signal (SIGINT, SIGTERM or SIGQUIT)
|
||||
wait_for_signal().await;
|
||||
|
||||
// cancel all the tasks and wait for all task to terminate
|
||||
api_state.cancel_and_wait().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
// std::env::set_var(
|
||||
@@ -134,6 +47,13 @@ async fn main() -> anyhow::Result<()> {
|
||||
let bin_info = bin_info_owned!();
|
||||
info!("using the following version: {bin_info}");
|
||||
|
||||
run_api(cli).await?;
|
||||
helpers::run_api(cli).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(not(unix))]
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
eprintln!("This tool is only supported on Unix systems");
|
||||
std::process::exit(1)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use anyhow::{anyhow, Result};
|
||||
use sqlx::{Connection, SqliteConnection};
|
||||
#[cfg(target_family = "unix")]
|
||||
use std::fs::Permissions;
|
||||
#[cfg(target_family = "unix")]
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use tokio::{fs::File, io::AsyncWriteExt};
|
||||
|
||||
@@ -39,7 +41,10 @@ async fn write_db_path_to_file(out_dir: &str, db_filename: &str) -> anyhow::Resu
|
||||
file.write_all(format!("sqlite3 {}/{}", out_dir, db_filename).as_bytes())
|
||||
.await?;
|
||||
|
||||
#[cfg(target_family = "unix")]
|
||||
file.set_permissions(Permissions::from_mode(0o755))
|
||||
.await
|
||||
.map_err(From::from)
|
||||
.map_err(anyhow::Error::from)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -37,6 +37,7 @@ use nym_validator_client::{nyxd, QueryHttpRpcNyxdClient, UserAgent};
|
||||
use rand::rngs::OsRng;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
#[cfg(unix)]
|
||||
use std::sync::Arc;
|
||||
use url::Url;
|
||||
use zeroize::Zeroizing;
|
||||
@@ -56,6 +57,7 @@ pub struct MixnetClientBuilder<S: MixnetClientStorage = Ephemeral> {
|
||||
custom_shutdown: Option<TaskClient>,
|
||||
force_tls: bool,
|
||||
user_agent: Option<UserAgent>,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: Option<Arc<dyn Fn(std::os::fd::RawFd) + Send + Sync>>,
|
||||
|
||||
// TODO: incorporate it properly into `MixnetClientStorage` (I will need it in wasm anyway)
|
||||
@@ -256,6 +258,7 @@ where
|
||||
self
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[must_use]
|
||||
pub fn with_connection_fd_callback(
|
||||
mut self,
|
||||
@@ -293,7 +296,10 @@ where
|
||||
client.wait_for_gateway = self.wait_for_gateway;
|
||||
client.force_tls = self.force_tls;
|
||||
client.user_agent = self.user_agent;
|
||||
client.connection_fd_callback = self.connection_fd_callback;
|
||||
#[cfg(unix)]
|
||||
if self.connection_fd_callback.is_some() {
|
||||
client.connection_fd_callback = self.connection_fd_callback;
|
||||
}
|
||||
client.forget_me = self.forget_me;
|
||||
Ok(client)
|
||||
}
|
||||
@@ -345,6 +351,7 @@ where
|
||||
user_agent: Option<UserAgent>,
|
||||
|
||||
/// Callback on the websocket fd as soon as the connection has been established
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: Option<Arc<dyn Fn(std::os::fd::RawFd) + Send + Sync>>,
|
||||
|
||||
forget_me: ForgetMe,
|
||||
@@ -397,6 +404,7 @@ where
|
||||
force_tls: false,
|
||||
custom_shutdown: None,
|
||||
user_agent: None,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: None,
|
||||
forget_me: Default::default(),
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user