From 2129f210ad2bf08dafc58a482919d9c39bbc4acf Mon Sep 17 00:00:00 2001 From: Michael Howell Date: Wed, 16 Sep 2020 23:30:20 -0700 Subject: [PATCH] Fix doctest --- core/lib/src/shutdown.rs | 36 +++++++++++++++++++++++++++--------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/core/lib/src/shutdown.rs b/core/lib/src/shutdown.rs index 52e8dbe4af..7b4fd9a1e1 100644 --- a/core/lib/src/shutdown.rs +++ b/core/lib/src/shutdown.rs @@ -1,4 +1,5 @@ use crate::request::{FromRequest, Outcome, Request}; +use futures::future::{FutureExt, Future, FusedFuture}; use tokio::sync::broadcast; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; @@ -56,18 +57,21 @@ impl Shutdown { /// so that the responder can short-circuit a long-running operation. /// /// ```rust,no_run + /// # #[macro_use] extern crate rocket; + /// # use rocket::Shutdown; + /// # /// use futures::{future, select}; /// #[get("/shutdown")] /// async fn wait_for_shutdown(handle: Shutdown) -> &'static str { - /// let shutdown_future = handle.wait(); - /// let long_running_operation = future::pending::<()>(); - /// let result = select! { + /// let mut shutdown_future = handle.wait(); + /// let mut long_running_operation = future::pending::<()>(); + /// select! { /// _ = shutdown_future => "shutting down...", /// _ = long_running_operation => "complete ok", - /// }.await + /// } /// } /// ``` - pub async fn wait(self) { + pub fn wait(self) -> impl Future + Unpin + FusedFuture { // This uses four events: // // * the store event @@ -85,10 +89,24 @@ impl Shutdown { // before the subscribe, then it must have also come before the load, which means that the load will // pick up on the atomic change. If the subscribe came before the store, then it also came before // the send, meaning that the broadcast channel will kick us out instead. - let mut recv = self.0.subscribe(); - if !self.1.load(Ordering::SeqCst) { - let _ = recv.recv().await; - } + + // To be useful in futures::select!, this future must be Unpin, + // which, because it holds a reference to its channel, basically means + // it has to be boxed. + let Shutdown(chan, flag) = self; + tokio::task::spawn(async move { + let mut recv = chan.subscribe(); + if !flag.load(Ordering::SeqCst) { + // Ignore errors, because if it fails to recv, + // then that means the sender has been dropped, + // which means that the system is shutting down. + recv.recv().await.unwrap_or(()) + } + // Ignore join errors, because if join fails, then that means the task + // either panicked, or was cancelled. It doesn't get cancelled, + // and if it panicked, then that means the channel was already + // dropped and we're shutting down anyway. + }).map(|e| e.unwrap_or(())).fuse() } }