Skip to content

Commit

Permalink
dev: cleanup launcher
Browse files Browse the repository at this point in the history
  • Loading branch information
da2ce7 committed Dec 28, 2023
1 parent 933eacb commit 8accd9f
Show file tree
Hide file tree
Showing 19 changed files with 482 additions and 662 deletions.
20 changes: 10 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ use tokio::task::JoinHandle;
use torrust_tracker_configuration::Configuration;

use crate::bootstrap::jobs::{health_check_api, http_tracker, torrent_cleanup, tracker_apis, udp_tracker};
use crate::core;
use crate::servers::http::Version;
use crate::{core, servers};

/// # Panics
///
Expand Down Expand Up @@ -68,21 +67,22 @@ pub async fn start(config: Arc<Configuration>, tracker: Arc<core::Tracker>) -> V
udp_tracker_config.bind_address, config.mode
);
} else {
jobs.push(udp_tracker::start_job(udp_tracker_config, tracker.clone()));
jobs.push(udp_tracker::start_job(udp_tracker_config, tracker.clone()).await);
}
}

// Start the HTTP blocks
for http_tracker_config in &config.http_trackers {
if !http_tracker_config.enabled {
continue;
}
jobs.push(http_tracker::start_job(http_tracker_config, tracker.clone(), Version::V1).await);
if let Some(job) = http_tracker::start_job(http_tracker_config, tracker.clone(), servers::http::Version::V1).await {
jobs.push(job);
};
}

// Start HTTP API
if config.http_api.enabled {
jobs.push(tracker_apis::start_job(&config.http_api, tracker.clone()).await);
if let Some(job) = tracker_apis::start_job(&config.http_api, tracker.clone(), servers::apis::Version::V1).await {
jobs.push(job);
};
}

// Start runners to remove torrents without peers, every interval
Expand Down
21 changes: 5 additions & 16 deletions src/bootstrap/jobs/health_check_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,16 @@
//!
//! Refer to the [configuration documentation](https://docs.rs/torrust-tracker-configuration)
//! for the API configuration options.
use std::net::SocketAddr;
use std::sync::Arc;

use log::info;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use torrust_tracker_configuration::Configuration;

use super::Started;
use crate::servers::health_check_api::server;

/// This is the message that the "launcher" spawned task sends to the main
/// application process to notify the API server was successfully started.
///
/// > **NOTICE**: it does not mean the API server is ready to receive requests.
/// It only means the new server started. It might take some time to the server
/// to be ready to accept request.
#[derive(Debug)]
pub struct ApiServerJobStarted {
pub bound_addr: SocketAddr,
}

/// This function starts a new Health Check API server with the provided
/// configuration.
///
Expand All @@ -53,22 +42,22 @@ pub async fn start_job(config: Arc<Configuration>) -> JoinHandle<()> {
.parse::<std::net::SocketAddr>()
.expect("Health Check API bind_address invalid.");

let (tx, rx) = oneshot::channel::<ApiServerJobStarted>();
let (tx_start, rx_start) = oneshot::channel::<Started>();

// Run the API server
let join_handle = tokio::spawn(async move {
info!("Starting Health Check API server: http://{}", bind_addr);

let handle = server::start(bind_addr, tx, config.clone());
let handle = server::start(bind_addr, tx_start, config.clone());

if let Ok(()) = handle.await {
info!("Health Check API server on http://{} stopped", bind_addr);
}
});

// Wait until the API server job is running
match rx.await {
Ok(_msg) => info!("Torrust Health Check API server started"),
match rx_start.await {
Ok(msg) => info!("Torrust Health Check API server started on socket: {}", msg.address),
Err(e) => panic!("the Health Check API server was dropped: {e}"),
}

Expand Down
97 changes: 38 additions & 59 deletions src/bootstrap/jobs/http_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,84 +11,63 @@
//!
//! The "**launcher**" is an intermediary thread that decouples the HTTP servers from the process that handles it. The HTTP could be used independently in the future.
//! In that case it would not need to notify a parent process.
use std::net::SocketAddr;
use std::sync::Arc;

use axum_server::tls_rustls::RustlsConfig;
use log::info;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use torrust_tracker_configuration::HttpTracker;

use crate::core;
use crate::servers::http::v1::launcher;
use crate::servers::http::server::{HttpServer, HttpServerLauncher};
use crate::servers::http::v1::launcher::Launcher;
use crate::servers::http::Version;

/// This is the message that the "**launcher**" spawned task sends to the main application process to notify that the HTTP server was successfully started.
///
/// > **NOTICE**: it does not mean the HTTP server is ready to receive requests. It only means the new server started. It might take some time to the server to be ready to accept request.
#[derive(Debug)]
pub struct ServerJobStarted();

/// It starts a new HTTP server with the provided configuration and version.
///
/// Right now there is only one version but in the future we could support more than one HTTP tracker version at the same time.
/// This feature allows supporting breaking changes on `BitTorrent` BEPs.
pub async fn start_job(config: &HttpTracker, tracker: Arc<core::Tracker>, version: Version) -> JoinHandle<()> {
match version {
Version::V1 => start_v1(config, tracker.clone()).await,
}
}

///
/// # Panics
///
/// It would panic if the `config::HttpTracker` struct would contain inappropriate values.
async fn start_v1(config: &HttpTracker, tracker: Arc<core::Tracker>) -> JoinHandle<()> {
let bind_addr = config
.bind_address
.parse::<std::net::SocketAddr>()
.expect("Tracker API bind_address invalid.");
let ssl_enabled = config.ssl_enabled;
let ssl_cert_path = config.ssl_cert_path.clone();
let ssl_key_path = config.ssl_key_path.clone();

let (tx, rx) = oneshot::channel::<ServerJobStarted>();

// Run the API server
let join_handle = tokio::spawn(async move {
if !ssl_enabled {
info!("Starting Torrust HTTP tracker server on: http://{}", bind_addr);

let handle = launcher::start(bind_addr, tracker);

tx.send(ServerJobStarted())
.expect("the HTTP tracker server should not be dropped");

if let Ok(()) = handle.await {
info!("Torrust HTTP tracker server on http://{} stopped", bind_addr);
}
} else if ssl_enabled && ssl_cert_path.is_some() && ssl_key_path.is_some() {
info!("Starting Torrust HTTP tracker server on: https://{}", bind_addr);

let ssl_config = RustlsConfig::from_pem_file(ssl_cert_path.unwrap(), ssl_key_path.unwrap())
///
pub async fn start_job(config: &HttpTracker, tracker: Arc<core::Tracker>, version: Version) -> Option<JoinHandle<()>> {
if config.enabled {
let socket = config
.bind_address
.parse::<std::net::SocketAddr>()
.expect("Tracker API bind_address invalid.");

let tls = if let (true, Some(cert), Some(key)) = (&config.ssl_enabled, &config.ssl_cert_path, &config.ssl_key_path) {
let tls = RustlsConfig::from_pem_file(cert, key)
.await
.unwrap();

let handle = launcher::start_tls(bind_addr, ssl_config, tracker);

tx.send(ServerJobStarted())
.expect("the HTTP tracker server should not be dropped");

if let Ok(()) = handle.await {
info!("Torrust HTTP tracker server on https://{} stopped", bind_addr);
}
.expect("Could not read tls cert.");
info!("Using https: cert path: {cert}.");
info!("Using https: key path: {cert}.");
Some(tls)
} else {
info!("Loading HTTP tracker without TLS.");
None
};

match version {
Version::V1 => Some(start_v1(socket, tls, tracker.clone()).await),
}
});

// Wait until the HTTP tracker server job is running
match rx.await {
Ok(_msg) => info!("Torrust HTTP tracker server started"),
Err(e) => panic!("the HTTP tracker server was dropped: {e}"),
} else {
info!("Note: Not loading Http Tracker Service, Not Enabled in Configuration.");
None
}
}

async fn start_v1(socket: SocketAddr, tls: Option<RustlsConfig>, tracker: Arc<core::Tracker>) -> JoinHandle<()> {
let server = HttpServer::new(Launcher::new(socket, tls))
.start(tracker)
.await
.expect("Failed to start Server");

join_handle
tokio::spawn(async move {
server.state.task.await.expect("failed to finish service");
})
}
8 changes: 8 additions & 0 deletions src/bootstrap/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,11 @@ pub mod http_tracker;
pub mod torrent_cleanup;
pub mod tracker_apis;
pub mod udp_tracker;

/// This is the message that the "launcher" spawned task sends to the main
/// application process to notify the service was successfully started.
///
#[derive(Debug)]
pub struct Started {
pub address: std::net::SocketAddr,
}
79 changes: 36 additions & 43 deletions src/bootstrap/jobs/tracker_apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@
//!
//! Refer to the [configuration documentation](https://docs.rs/torrust-tracker-configuration)
//! for the API configuration options.
use std::net::SocketAddr;
use std::sync::Arc;

use axum_server::tls_rustls::RustlsConfig;
use log::info;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use torrust_tracker_configuration::HttpApi;

use crate::core;
use crate::servers::apis::server;
use crate::servers::apis::server::{ApiServer, Launcher};
use crate::servers::apis::Version;

/// This is the message that the "launcher" spawned task sends to the main
/// application process to notify the API server was successfully started.
Expand All @@ -49,51 +50,43 @@ pub struct ApiServerJobStarted();
/// # Panics
///
/// It would panic if unable to send the `ApiServerJobStarted` notice.
pub async fn start_job(config: &HttpApi, tracker: Arc<core::Tracker>) -> JoinHandle<()> {
let bind_addr = config
.bind_address
.parse::<std::net::SocketAddr>()
.expect("Tracker API bind_address invalid.");
let ssl_enabled = config.ssl_enabled;
let ssl_cert_path = config.ssl_cert_path.clone();
let ssl_key_path = config.ssl_key_path.clone();

let (tx, rx) = oneshot::channel::<ApiServerJobStarted>();

// Run the API server
let join_handle = tokio::spawn(async move {
if !ssl_enabled {
info!("Starting Torrust APIs server on: http://{}", bind_addr);

let handle = server::start(bind_addr, tracker);

tx.send(ApiServerJobStarted()).expect("the API server should not be dropped");

if let Ok(()) = handle.await {
info!("Torrust APIs server on http://{} stopped", bind_addr);
}
} else if ssl_enabled && ssl_cert_path.is_some() && ssl_key_path.is_some() {
info!("Starting Torrust APIs server on: https://{}", bind_addr);
///
///
pub async fn start_job(config: &HttpApi, tracker: Arc<core::Tracker>, version: Version) -> Option<JoinHandle<()>> {
if config.enabled {
let socket = config
.bind_address
.parse::<std::net::SocketAddr>()
.expect("Tracker API bind_address invalid.");

let ssl_config = RustlsConfig::from_pem_file(ssl_cert_path.unwrap(), ssl_key_path.unwrap())
let tls = if let (true, Some(cert), Some(key)) = (&config.ssl_enabled, &config.ssl_cert_path, &config.ssl_key_path) {
let tls = RustlsConfig::from_pem_file(cert, key)
.await
.unwrap();

let handle = server::start_tls(bind_addr, ssl_config, tracker);

tx.send(ApiServerJobStarted()).expect("the API server should not be dropped");
.expect("Could not read tls cert.");
info!("Using https: cert path: {cert}.");
info!("Using https: key path: {cert}.");
Some(tls)
} else {
info!("Loading HTTP tracker without TLS.");
None
};

if let Ok(()) = handle.await {
info!("Torrust APIs server on https://{} stopped", bind_addr);
}
match version {
Version::V1 => Some(start_v1(socket, tls, tracker.clone()).await),
}
});

// Wait until the APIs server job is running
match rx.await {
Ok(_msg) => info!("Torrust APIs server started"),
Err(e) => panic!("the API server was dropped: {e}"),
} else {
info!("Note: Not loading Http Tracker Service, Not Enabled in Configuration.");
None
}
}

async fn start_v1(socket: SocketAddr, tls: Option<RustlsConfig>, tracker: Arc<core::Tracker>) -> JoinHandle<()> {
let server = ApiServer::new(Launcher::new(socket, tls))
.start(tracker)
.await
.expect("Failed to start Server");

join_handle
tokio::spawn(async move {
server.state.task.await.expect("failed to close service");
})
}
Loading

0 comments on commit 8accd9f

Please sign in to comment.