Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful shutdown timeout #1374

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions core/lib/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ pub struct Config {
pub(crate) config_file_path: Option<PathBuf>,
/// The path root-relative files will be rooted from.
pub(crate) root_path: Option<PathBuf>,
/// Default duraion to hold open a connection on shutting down.
pub wait_on_shutdown: u32,
}

macro_rules! config_from_raw {
Expand Down Expand Up @@ -283,6 +285,7 @@ impl Config {
extras: HashMap::new(),
config_file_path: None,
root_path: None,
wait_on_shutdown: 0,
}
}
Staging => {
Expand All @@ -299,6 +302,7 @@ impl Config {
extras: HashMap::new(),
config_file_path: None,
root_path: None,
wait_on_shutdown: 1,
}
}
Production => {
Expand All @@ -315,6 +319,7 @@ impl Config {
extras: HashMap::new(),
config_file_path: None,
root_path: None,
wait_on_shutdown: 1,
}
}
})
Expand Down Expand Up @@ -358,6 +363,7 @@ impl Config {
secret_key => (str, set_secret_key, id),
tls => (tls_config, set_raw_tls, id),
limits => (limits, set_limits, ok),
wait_on_shutdown => (u32, set_wait_on_shutdown, ok),
| _ => {
self.extras.insert(name.into(), val.clone());
Ok(())
Expand Down Expand Up @@ -426,6 +432,22 @@ impl Config {
self.port = port;
}

/// Sets the amount of time to wait for a response to finish on graceful shutdown (in seconds).
///
/// # Example
///
/// ```rust
/// use rocket::config::{Config, Environment};
///
/// let mut config = Config::new(Environment::Staging);
/// config.set_wait_on_shutdown(2);
/// assert_eq!(config.wait_on_shutdown, 2);
/// ```
#[inline]
pub fn set_wait_on_shutdown(&mut self, wait_on_shutdown: u32) {
self.wait_on_shutdown = wait_on_shutdown;
}

/// Sets the number of `workers` in `self` to `workers`.
///
/// # Example
Expand Down Expand Up @@ -972,6 +994,7 @@ impl fmt::Debug for Config {
s.field("workers", &self.workers);
s.field("keep_alive", &self.keep_alive);
s.field("log_level", &self.log_level);
s.field("wait_on_shutdown", &self.wait_on_shutdown);

for (key, value) in self.extras() {
s.field(key, &value);
Expand Down
25 changes: 13 additions & 12 deletions core/lib/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,19 @@
//! not used by Rocket itself but can be used by external libraries. The
//! standard configuration parameters are:
//!
//! | name | type | description | examples |
//! |------------|----------------|-------------------------------------------------------------|----------------------------|
//! | address | string | ip address or host to listen on | `"localhost"`, `"1.2.3.4"` |
//! | port | integer | port number to listen on | `8000`, `80` |
//! | keep_alive | integer | keep-alive timeout in seconds | `0` (disable), `10` |
//! | workers | integer | number of concurrent thread workers | `36`, `512` |
//! | log | string | max log level: `"off"`, `"normal"`, `"debug"`, `"critical"` | `"off"`, `"normal"` |
//! | secret_key | 256-bit base64 | secret key for private cookies | `"8Xui8SI..."` (44 chars) |
//! | tls | table | tls config table with two keys (`certs`, `key`) | _see below_ |
//! | tls.certs | string | path to certificate chain in PEM format | `"private/cert.pem"` |
//! | tls.key | string | path to private key for `tls.certs` in PEM format | `"private/key.pem"` |
//! | limits | table | map from data type (string) to data limit (integer: bytes) | `{ forms = 65536 }` |
//! | name | type | description | examples |
//! |------------------|----------------|-------------------------------------------------------------|----------------------------|
//! | address | string | ip address or host to listen on | `"localhost"`, `"1.2.3.4"` |
//! | port | integer | port number to listen on | `8000`, `80` |
//! | keep_alive | integer | keep-alive timeout in seconds | `0` (disable), `10` |
//! | wait_on_shutdown | integer | timeout in seconds when stopping the server | `0` (disable), `10` |
//! | workers | integer | number of concurrent thread workers | `36`, `512` |
//! | log | string | max log level: `"off"`, `"normal"`, `"debug"`, `"critical"` | `"off"`, `"normal"` |
//! | secret_key | 256-bit base64 | secret key for private cookies | `"8Xui8SI..."` (44 chars) |
//! | tls | table | tls config table with two keys (`certs`, `key`) | _see below_ |
//! | tls.certs | string | path to certificate chain in PEM format | `"private/cert.pem"` |
//! | tls.key | string | path to private key for `tls.certs` in PEM format | `"private/key.pem"` |
//! | limits | table | map from data type (string) to data limit (integer: bytes) | `{ forms = 65536 }` |
//!
//! ### Rocket.toml
//!
Expand Down
41 changes: 41 additions & 0 deletions core/lib/src/response/response.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{io, fmt, str};
use std::time::Duration;
use std::borrow::Cow;
use std::pin::Pin;

Expand Down Expand Up @@ -235,6 +236,19 @@ impl<'r> ResponseBuilder<'r> {
self
}

/// Sets the finish-on-shutdown delay.
///
/// If this response is set to finish on shutdown, then Rocket will wait
/// until the body is completely served, or the timer runs out. Otherwise,
/// the client may experience a partially-completed download.
///
/// The finish-on-shutdown delay defaults to `0`.
#[inline(always)]
pub fn wait_on_shutdown(&mut self, wait_on_shutdown: Duration) -> &mut ResponseBuilder<'r> {
self.response.set_wait_on_shutdown(wait_on_shutdown);
self
}

/// Sets the status of the `Response` being built to a custom status
/// constructed from the `code` and `reason` phrase.
///
Expand Down Expand Up @@ -598,6 +612,7 @@ pub struct Response<'r> {
status: Option<Status>,
headers: HeaderMap<'r>,
body: Option<ResponseBody<'r>>,
wait_on_shutdown: Duration,
}

impl<'r> Response<'r> {
Expand All @@ -624,6 +639,7 @@ impl<'r> Response<'r> {
status: None,
headers: HeaderMap::new(),
body: None,
wait_on_shutdown: Duration::from_millis(0),
}
}

Expand Down Expand Up @@ -658,6 +674,31 @@ impl<'r> Response<'r> {
ResponseBuilder::new(other)
}

/// Returns the finish-on-shutdown delay.
///
/// If this response is set to a value other than zero, then Rocket
/// will wait until the body is completely served, or the timer
/// runs out. Otherwise, the client may experience a
/// partially-completed download.
///
/// The finish-on-shutdown delay defaults to `0`.
#[inline(always)]
pub fn wait_on_shutdown(&self) -> Duration {
self.wait_on_shutdown
}

/// Sets the finish-on-shutdown delay.
///
/// If this response is set to finish on shutdown, then Rocket will wait
/// until the body is completely served. Otherwise, the client may
/// experience a partially-completed download.
///
/// The finish-on-shutdown delay defaults to `0`.
#[inline(always)]
pub fn set_wait_on_shutdown(&mut self, wait_on_shutdown: Duration) {
self.wait_on_shutdown = wait_on_shutdown;
}

/// Returns the status of `self`.
///
/// # Example
Expand Down
66 changes: 49 additions & 17 deletions core/lib/src/rocket.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use std::{io, mem};
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::collections::HashMap;
use std::time::Duration;
use std::cmp::min;

#[allow(unused_imports)]
use futures::future::FutureExt;
use futures::stream::StreamExt;
use futures::future::{Future, BoxFuture};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::{broadcast, oneshot};
use ref_cast::RefCast;

use yansi::Paint;
Expand Down Expand Up @@ -41,7 +44,7 @@ pub struct Rocket {
default_catcher: Option<Catcher>,
catchers: HashMap<u16, Catcher>,
fairings: Fairings,
shutdown_receiver: Option<mpsc::Receiver<()>>,
shutdown_receiver: Option<broadcast::Receiver<()>>,
pub(crate) shutdown_handle: Shutdown,
}

Expand Down Expand Up @@ -118,6 +121,7 @@ impl Rocket {

// Create a "dummy" instance of `Rocket` to use while mem-swapping `self`.
fn dummy() -> Rocket {
let (tx, _) = broadcast::channel(1);
Rocket {
manifest: vec![],
config: Config::development(),
Expand All @@ -126,7 +130,7 @@ impl Rocket {
catchers: HashMap::new(),
managed_state: Container::new(),
fairings: Fairings::new(),
shutdown_handle: Shutdown(mpsc::channel(1).0),
shutdown_handle: Shutdown(tx, Arc::new(AtomicBool::new(false))),
shutdown_receiver: None,
}
}
Expand Down Expand Up @@ -189,6 +193,10 @@ async fn hyper_service_fn(
// the response metadata (and a body channel) beforehand.
let (tx, rx) = oneshot::channel();

// The shutdown subscription needs to be opened before dispatching the request.
// Otherwise, if shutdown begins during initial request processing, we would miss it.
let shutdown_receiver = rocket.shutdown_handle.0.subscribe();

tokio::spawn(async move {
// Get all of the information from Hyper.
let (h_parts, h_body) = hyp_req.into_parts();
Expand All @@ -205,7 +213,7 @@ async fn hyper_service_fn(
// handler) instead of doing this.
let dummy = Request::new(&rocket, Method::Get, Origin::dummy());
let r = rocket.handle_error(Status::BadRequest, &dummy).await;
return rocket.issue_response(r, tx).await;
return rocket.issue_response(r, tx, shutdown_receiver).await;
}
};

Expand All @@ -215,7 +223,7 @@ async fn hyper_service_fn(
// Dispatch the request to get a response, then write that response out.
let token = rocket.preprocess_request(&mut req, &mut data).await;
let r = rocket.dispatch(token, &mut req, data).await;
rocket.issue_response(r, tx).await;
rocket.issue_response(r, tx, shutdown_receiver).await;
});

rx.await.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
Expand All @@ -227,14 +235,37 @@ impl Rocket {
&self,
response: Response<'_>,
tx: oneshot::Sender<hyper::Response<hyper::Body>>,
mut shutdown_receiver: broadcast::Receiver<()>,
) {
let result = self.write_response(response, tx);
match result.await {
Ok(()) => {
info_!("{}", Paint::green("Response succeeded."));
let wait_on_shutdown = {
let wait_on_shutdown = response.wait_on_shutdown();
min(wait_on_shutdown, Duration::from_millis((self.config.wait_on_shutdown as u64) * 1_000))
};
let mut shutdown_receiver = {
let (tx, rx) = broadcast::channel(1);
tokio::spawn(async move {
let _ = shutdown_receiver.recv().await;
tokio::time::delay_for(wait_on_shutdown).await;
tx.send(()).expect("there should always be at least one shutdown listener");
});
rx
};
tokio::select!{
result = self.write_response(response, tx) => {
match result {
Ok(()) => {
info_!("{}", Paint::green("Response succeeded."));
}
Err(e) => {
error_!("Failed to write response: {:?}.", e);
}
}
}
Err(e) => {
error_!("Failed to write response: {:?}.", e);
// The error returned by `recv()` is discarded here.
// This is fine, because the only case where it returns that error is
// if the sender is dropped, which would indicate shutdown anyway.
_ = shutdown_receiver.recv() => {
info_!("{}", Paint::red("Response cancelled for shutdown."));
}
}
}
Expand Down Expand Up @@ -517,8 +548,7 @@ impl Rocket {
// listener.set_keepalive(timeout);

// We need to get this before moving `self` into an `Arc`.
let mut shutdown_receiver = self.shutdown_receiver
.take().expect("shutdown receiver has already been used");
let mut shutdown_receiver = self.shutdown_receiver.take().expect("a rocket will listen exactly once");

let rocket = Arc::new(self);
let service = hyper::make_service_fn(move |connection: &<L as Listener>::Connection| {
Expand All @@ -545,7 +575,9 @@ impl Rocket {
hyper::Server::builder(Incoming::from_listener(listener))
.executor(TokioExecutor)
.serve(service)
.with_graceful_shutdown(async move { shutdown_receiver.recv().await; })
// Discarding the error is fine, because it indicates that the sender has been dropped.
// If the sender is dropped, then the Rocket is dropped, and that means we're shutting down.
.with_graceful_shutdown(async move { let _ = shutdown_receiver.recv().await; })
.await
.map_err(|e| crate::error::Error::Run(Box::new(e)))
}
Expand Down Expand Up @@ -661,17 +693,17 @@ impl Rocket {
}

let managed_state = Container::new();
let (shutdown_sender, shutdown_receiver) = mpsc::channel(1);
let (shutdown_sender, shutdown_receiver) = broadcast::channel(1);

Rocket {
config, managed_state,
shutdown_handle: Shutdown(shutdown_sender),
shutdown_handle: Shutdown(shutdown_sender, Arc::new(AtomicBool::new(false))),
shutdown_receiver: Some(shutdown_receiver),
manifest: vec![],
router: Router::new(),
default_catcher: None,
catchers: HashMap::new(),
fairings: Fairings::new(),
shutdown_receiver: Some(shutdown_receiver),
}
}

Expand Down
Loading