Skip to content

Commit

Permalink
feat: Add sink::unfold
Browse files Browse the repository at this point in the history
A more general function than `from_fn` (rust-lang#2254), as suggested in
rust-lang#2254 (comment)
  • Loading branch information
Markus Westerlind authored and 2020zhy committed Dec 8, 2020
1 parent 0f4d8cf commit 139b595
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 37 deletions.
88 changes: 52 additions & 36 deletions futures-util/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
//! This module is only available when the `sink` feature of this
//! library is activated, and it is activated by default.
use crate::future::Either;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::{Stream, TryStream};
use futures_core::task::{Context, Poll};
use crate::future::Either;

#[cfg(feature = "compat")]
use crate::compat::CompatSink;
Expand Down Expand Up @@ -41,6 +41,9 @@ pub use self::send::Send;
mod send_all;
pub use self::send_all::SendAll;

mod unfold;
pub use self::unfold::{unfold, Unfold};

mod with;
pub use self::with::With;

Expand Down Expand Up @@ -69,10 +72,11 @@ pub trait SinkExt<Item>: Sink<Item> {
/// Note that this function consumes the given sink, returning a wrapped
/// version, much like `Iterator::map`.
fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>
where F: FnMut(U) -> Fut,
Fut: Future<Output = Result<Item, E>>,
E: From<Self::Error>,
Self: Sized
where
F: FnMut(U) -> Fut,
Fut: Future<Output = Result<Item, E>>,
E: From<Self::Error>,
Self: Sized,
{
With::new(self, f)
}
Expand Down Expand Up @@ -110,9 +114,10 @@ pub trait SinkExt<Item>: Sink<Item> {
/// # });
/// ```
fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
where F: FnMut(U) -> St,
St: Stream<Item = Result<Item, Self::Error>>,
Self: Sized
where
F: FnMut(U) -> St,
St: Stream<Item = Result<Item, Self::Error>>,
Self: Sized,
{
WithFlatMap::new(self, f)
}
Expand All @@ -133,8 +138,9 @@ pub trait SinkExt<Item>: Sink<Item> {

/// Transforms the error returned by the sink.
fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
where F: FnOnce(Self::Error) -> E,
Self: Sized,
where
F: FnOnce(Self::Error) -> E,
Self: Sized,
{
SinkMapErr::new(self, f)
}
Expand All @@ -143,13 +149,13 @@ pub trait SinkExt<Item>: Sink<Item> {
///
/// If wanting to map errors of a `Sink + Stream`, use `.sink_err_into().err_into()`.
fn sink_err_into<E>(self) -> err_into::SinkErrInto<Self, Item, E>
where Self: Sized,
Self::Error: Into<E>,
where
Self: Sized,
Self::Error: Into<E>,
{
SinkErrInto::new(self)
}


/// Adds a fixed-size buffer to the current sink.
///
/// The resulting sink will buffer up to `capacity` items when the
Expand All @@ -164,14 +170,16 @@ pub trait SinkExt<Item>: Sink<Item> {
/// library is activated, and it is activated by default.
#[cfg(feature = "alloc")]
fn buffer(self, capacity: usize) -> Buffer<Self, Item>
where Self: Sized,
where
Self: Sized,
{
Buffer::new(self, capacity)
}

/// Close the sink.
fn close(&mut self) -> Close<'_, Self, Item>
where Self: Unpin,
where
Self: Unpin,
{
Close::new(self)
}
Expand All @@ -181,9 +189,10 @@ pub trait SinkExt<Item>: Sink<Item> {
/// This adapter clones each incoming item and forwards it to both this as well as
/// the other sink at the same time.
fn fanout<Si>(self, other: Si) -> Fanout<Self, Si>
where Self: Sized,
Item: Clone,
Si: Sink<Item, Error=Self::Error>
where
Self: Sized,
Item: Clone,
Si: Sink<Item, Error = Self::Error>,
{
Fanout::new(self, other)
}
Expand All @@ -193,7 +202,8 @@ pub trait SinkExt<Item>: Sink<Item> {
/// This adapter is intended to be used when you want to stop sending to the sink
/// until all current requests are processed.
fn flush(&mut self) -> Flush<'_, Self, Item>
where Self: Unpin,
where
Self: Unpin,
{
Flush::new(self)
}
Expand All @@ -205,7 +215,8 @@ pub trait SinkExt<Item>: Sink<Item> {
/// to batch together items to send via `send_all`, rather than flushing
/// between each item.**
fn send(&mut self, item: Item) -> Send<'_, Self, Item>
where Self: Unpin,
where
Self: Unpin,
{
Send::new(self, item)
}
Expand All @@ -221,12 +232,10 @@ pub trait SinkExt<Item>: Sink<Item> {
/// Doing `sink.send_all(stream)` is roughly equivalent to
/// `stream.forward(sink)`. The returned future will exhaust all items from
/// `stream` and send them to `self`.
fn send_all<'a, St>(
&'a mut self,
stream: &'a mut St
) -> SendAll<'a, Self, St>
where St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized,
Self: Unpin,
fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
where
St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized,
Self: Unpin,
{
SendAll::new(self, stream)
}
Expand All @@ -237,8 +246,9 @@ pub trait SinkExt<Item>: Sink<Item> {
/// This can be used in combination with the `right_sink` method to write `if`
/// statements that evaluate to different streams in different branches.
fn left_sink<Si2>(self) -> Either<Self, Si2>
where Si2: Sink<Item, Error = Self::Error>,
Self: Sized
where
Si2: Sink<Item, Error = Self::Error>,
Self: Sized,
{
Either::Left(self)
}
Expand All @@ -249,8 +259,9 @@ pub trait SinkExt<Item>: Sink<Item> {
/// This can be used in combination with the `left_sink` method to write `if`
/// statements that evaluate to different streams in different branches.
fn right_sink<Si1>(self) -> Either<Si1, Self>
where Si1: Sink<Item, Error = Self::Error>,
Self: Sized
where
Si1: Sink<Item, Error = Self::Error>,
Self: Sized,
{
Either::Right(self)
}
Expand All @@ -260,39 +271,44 @@ pub trait SinkExt<Item>: Sink<Item> {
#[cfg(feature = "compat")]
#[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
fn compat(self) -> CompatSink<Self, Item>
where Self: Sized + Unpin,
where
Self: Sized + Unpin,
{
CompatSink::new(self)
}

/// A convenience method for calling [`Sink::poll_ready`] on [`Unpin`]
/// sink types.
fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
where Self: Unpin
where
Self: Unpin,
{
Pin::new(self).poll_ready(cx)
}

/// A convenience method for calling [`Sink::start_send`] on [`Unpin`]
/// sink types.
fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>
where Self: Unpin
where
Self: Unpin,
{
Pin::new(self).start_send(item)
}

/// A convenience method for calling [`Sink::poll_flush`] on [`Unpin`]
/// sink types.
fn poll_flush_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
where Self: Unpin
where
Self: Unpin,
{
Pin::new(self).poll_flush(cx)
}

/// A convenience method for calling [`Sink::poll_close`] on [`Unpin`]
/// sink types.
fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
where Self: Unpin
where
Self: Unpin,
{
Pin::new(self).poll_close(cx)
}
Expand Down
83 changes: 83 additions & 0 deletions futures-util/src/sink/unfold.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use core::{future::Future, pin::Pin};
use futures_core::task::{Context, Poll};
use futures_sink::Sink;
use pin_project::pin_project;

/// Sink for the [`unfold`] function.
#[pin_project]
#[derive(Debug)]
#[must_use = "sinks do nothing unless polled"]
pub struct Unfold<T, F, R> {
state: Option<T>,
function: F,
#[pin]
future: Option<R>,
}

/// Create a sink from a function which processes one item at a time.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::sink::{self, SinkExt};
///
/// let unfold = sink::unfold(0, |mut sum, i: i32| {
/// async move {
/// sum += i;
/// eprintln!("{}", i);
/// Ok::<_, futures::never::Never>(sum)
/// }
/// });
/// futures::pin_mut!(unfold);
/// unfold.send(5).await?;
/// # Ok::<(), futures::never::Never>(()) }).unwrap();
/// ```
pub fn unfold<T, F, R>(init: T, function: F) -> Unfold<T, F, R> {
Unfold {
state: Some(init),
function,
future: None,
}
}

impl<T, F, R, Item, E> Sink<Item> for Unfold<T, F, R>
where
F: FnMut(T, Item) -> R,
R: Future<Output = Result<T, E>>,
{
type Error = E;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_flush(cx)
}

fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
let mut this = self.project();
debug_assert!(this.future.is_none());
let future = (this.function)(this.state.take().unwrap(), item);
this.future.set(Some(future));
Ok(())
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut this = self.project();
Poll::Ready(if let Some(future) = this.future.as_mut().as_pin_mut() {
let result = match ready!(future.poll(cx)) {
Ok(state) => {
*this.state = Some(state);
Ok(())
}
Err(err) => Err(err),
};
this.future.set(None);
result
} else {
Ok(())
})
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_flush(cx)
}
}
2 changes: 1 addition & 1 deletion futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ pub mod sink {

pub use futures_util::sink::{
Close, Flush, Send, SendAll, SinkErrInto, SinkMapErr, With,
SinkExt, Fanout, Drain, drain,
SinkExt, Fanout, Drain, drain, Unfold, unfold,
WithFlatMap,
};

Expand Down
38 changes: 38 additions & 0 deletions futures/tests/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,44 @@ fn sink_map_err() {
);
}

#[test]
fn sink_unfold() {
use futures::channel::mpsc;
use futures::executor::block_on;
use futures::future::poll_fn;
use futures::sink::{self, Sink, SinkExt};
use futures::task::Poll;

block_on(poll_fn(|cx| {
let (tx, mut rx) = mpsc::channel(1);
let unfold = sink::unfold((), |(), i: i32| {
let mut tx = tx.clone();
async move {
tx.send(i).await.unwrap();
Ok::<_, String>(())
}
});
futures::pin_mut!(unfold);
assert_eq!(unfold.as_mut().start_send(1), Ok(()));
assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Ready(Ok(())));
assert_eq!(rx.try_next().unwrap(), Some(1));

assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
assert_eq!(unfold.as_mut().start_send(2), Ok(()));
assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
assert_eq!(unfold.as_mut().start_send(3), Ok(()));
assert_eq!(rx.try_next().unwrap(), Some(2));
assert!(rx.try_next().is_err());
assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
assert_eq!(unfold.as_mut().start_send(4), Ok(()));
assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Pending); // Channel full
assert_eq!(rx.try_next().unwrap(), Some(3));
assert_eq!(rx.try_next().unwrap(), Some(4));

Poll::Ready(())
}))
}

#[test]
fn err_into() {
use futures::channel::mpsc;
Expand Down

0 comments on commit 139b595

Please sign in to comment.