Skip to content

Commit

Permalink
Fix doctest
Browse files Browse the repository at this point in the history
  • Loading branch information
notriddle committed Sep 17, 2020
1 parent 7e22da0 commit 2129f21
Showing 1 changed file with 27 additions and 9 deletions.
36 changes: 27 additions & 9 deletions core/lib/src/shutdown.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::request::{FromRequest, Outcome, Request};
use futures::future::{FutureExt, Future, FusedFuture};
use tokio::sync::broadcast;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
Expand Down Expand Up @@ -56,18 +57,21 @@ impl Shutdown {
/// so that the responder can short-circuit a long-running operation.
///
/// ```rust,no_run
/// # #[macro_use] extern crate rocket;
/// # use rocket::Shutdown;
/// #
/// use futures::{future, select};
/// #[get("/shutdown")]
/// async fn wait_for_shutdown(handle: Shutdown) -> &'static str {
/// let shutdown_future = handle.wait();
/// let long_running_operation = future::pending::<()>();
/// let result = select! {
/// let mut shutdown_future = handle.wait();
/// let mut long_running_operation = future::pending::<()>();
/// select! {
/// _ = shutdown_future => "shutting down...",
/// _ = long_running_operation => "complete ok",
/// }.await
/// }
/// }
/// ```
pub async fn wait(self) {
pub fn wait(self) -> impl Future<Output=()> + Unpin + FusedFuture {
// This uses four events:
//
// * the store event
Expand All @@ -85,10 +89,24 @@ impl Shutdown {
// before the subscribe, then it must have also come before the load, which means that the load will
// pick up on the atomic change. If the subscribe came before the store, then it also came before
// the send, meaning that the broadcast channel will kick us out instead.
let mut recv = self.0.subscribe();
if !self.1.load(Ordering::SeqCst) {
let _ = recv.recv().await;
}

// To be useful in futures::select!, this future must be Unpin,
// which, because it holds a reference to its channel, basically means
// it has to be boxed.
let Shutdown(chan, flag) = self;
tokio::task::spawn(async move {
let mut recv = chan.subscribe();
if !flag.load(Ordering::SeqCst) {
// Ignore errors, because if it fails to recv,
// then that means the sender has been dropped,
// which means that the system is shutting down.
recv.recv().await.unwrap_or(())
}
// Ignore join errors, because if join fails, then that means the task
// either panicked, or was cancelled. It doesn't get cancelled,
// and if it panicked, then that means the channel was already
// dropped and we're shutting down anyway.
}).map(|e| e.unwrap_or(())).fuse()
}
}

Expand Down

0 comments on commit 2129f21

Please sign in to comment.