Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can we simplify io::copy? #365

Closed
ghost opened this issue Oct 17, 2019 · 22 comments
Closed

Can we simplify io::copy? #365

ghost opened this issue Oct 17, 2019 · 22 comments
Labels
api design Open design questions question/feedback A question or user feedback

Comments

@ghost
Copy link

ghost commented Oct 17, 2019

The current signature of async_std::io::copy is:

pub async fn copy<R, W>(reader: &mut R, writer: &mut W) -> io::Result<u64>
where
    R: Read + Unpin + ?Sized,
    W: Write + Unpin + ?Sized;

Both reader and writer need to be mutable references because we're just mimicking std::io::copy.

Unfortunately, this API is annoying to use when we want to share TcpStreams. Just look at this convoluted &mut (&stream, &stream) pattern:

async fn process(stream: TcpStream) -> io::Result<()> {
    let (reader, writer) = &mut (&stream, &stream);
    io::copy(reader, writer).await?;
    Ok(())
}

I think we can do better. What if we didn't follow the API from std and had the following instead?

pub async fn copy<R, W>(reader: R, writer: W) -> io::Result<u64>
where
    R: Read + Unpin,
    W: Write + Unpin;

This might look like a less powerful APIs than the previous one, but I believe it is functionally the same. Note that we have these blanket impls of Read and Write:

impl<T: Read + Unpin + ?Sized> Read for &mut T {}
impl<T: Write + Unpin + ?Sized> Write for &mut T {}

That means if we can pass &mut T into the previous API, then it should be totally fine to pass it into the new one too! In other words, the new API is fully compatible with the previous one (unless I'm missing something here).

So the cool thing is that while it might seem we're deviating from the std APIs, we kind of aren't. :)

Here's how we can write an echo TCP server using the new async_std::io::copy:

async fn process(stream: TcpStream) -> io::Result<()> {
    io::copy(&stream, &stream).await?;
    Ok(())
}

And here's how we do it on Arc<TcpStream>:

async fn process(stream: Arc<TcpStream>) -> io::Result<()> {
    io::copy(&*stream, &*stream).await?;
    Ok(())
}

This change could make sharing streams a lot easier, and we've seen plenty of people struggle with this problem. Wdyt?

@ghost ghost added question/feedback A question or user feedback api design Open design questions labels Oct 17, 2019
@ghost
Copy link
Author

ghost commented Oct 17, 2019

cc @skade @yoshuawuyts

@ghost
Copy link
Author

ghost commented Oct 17, 2019

I believe we could make the same change in the standard library. scratches head

@yoshuawuyts
Copy link
Contributor

@stjepang I'd be in favor of trying this out — it seems you've thought this through and this seems good. It might be interesting to open an issue for this in std also!

@skade
Copy link
Collaborator

skade commented Oct 18, 2019

I think the main motivation behind forcibly taking a reference is to show intended use, I don't see much use for an API where both parameters are passed owned if they are streams.

I'm not sure if this is a good motivation for such an API, as accidentally passing the stream owned will be caught by the borrow checker in all cases it is a problem. I'm fine with trying out this change.

@ghost
Copy link
Author

ghost commented Nov 7, 2019

It's interesting how the futures crate only went halfway there:

pub fn copy<R, W>(reader: R, writer: &mut W) -> Copy<R, W> where
    R: AsyncRead,
    W: AsyncWrite + Unpin + ?Sized;

Source: https://docs.rs/futures/0.3.0/futures/io/fn.copy.html

So reader is owned, but writer still isn't. This still doesn't make the pattern we want possible but makes some others a bit easier.

@ghost
Copy link
Author

ghost commented Nov 7, 2019

What are your feelings on this? People are repeatedly complaining about &mut (&stream, &stream) and I must admit -- it is is an odd pattern.

Tokio has the .split() method that returns back a reader and writer pair, but that's a lot of API surface which doesn't exist in the standard library.

With the change proposed here, everything that works in std would also work here, except we'd also be a bit more general and accept things like io::read(&stream, &stream).

@yoshuawuyts
Copy link
Contributor

yoshuawuyts commented Nov 7, 2019

Still happy to try this change out!

edit: we should probably mark it as "unstable" though, at least for one release cycle to allow us to change our minds.

@gsquire
Copy link

gsquire commented Nov 15, 2019

Tokio has the .split() method that returns back a reader and writer pair, but that's a lot of API surface which doesn't exist in the standard library.

I saw another issue mentioning a try_clone method for TcpStream but the author solved it in another way. I think it would be nice to have a function that can "split" a TcpStream into a read half and a write half. My use case would be something like this snippet:

let conn = TcpStream::connect(&self.addr).await?;
let (r, w) = conn.split(); // Or some variation of this function.
self.reader = BufReader::new(r);
self.writer = BufWriter::new(w);

This way I can have a bi-directional flow between my client and server that is buffered.

@yoshuawuyts
Copy link
Contributor

@gsquire you can put TcpStream inside of an Arc and it should mostly work the way you want it to already.

@gsquire
Copy link

gsquire commented Nov 15, 2019

@yoshuawuyts I ran into borrowing issues since this was inside of a function that was making a new type. I'll re-evaluate and see where I end up. Maybe the bufstream crate will support async-std at some point.

@sebastiencs
Copy link

@gsquire you can put TcpStream inside of an Arc and it should mostly work the way you want it to already.

Could you elaborate @yoshuawuyts ?
I also want both a BufReader and Bufwriter from a TcpStream and I can't find a way to achieve this currently.
Having a try_clone or split method would be nice.

@yoshuawuyts
Copy link
Contributor

I believe Arc::new(TcpStream::connect(addr)) should just about do what you want it to; can then freely use the stream and clone it around. Because most methods use inner mutability only the signature is &self, so it's very flexible.

On phone now, so haven't had a chance to test this out. But afaik people have done this successfully before.

@sebastiencs
Copy link

@yoshuawuyts The issue is that BufReader::new and BufWriter::new take their argument by value, not by ref.
So we can make either a BufReader or a BufWriter, not both

@ghost
Copy link
Author

ghost commented Nov 16, 2019

This problem has a straightforward solution. :) We just need to implement Read and Write for Arc<TcpStream> and then you'll be able to do:

let conn = Arc::new(TcpStream::connect(&self.addr).await?);
self.reader = BufReader::new(conn.clone());
self.writer = BufWriter::new(conn);

@sebastiencs
Copy link

@stjepang Is that possible with the orphan rules ? Read, Write and Arc don't belong to async-std

@yoshuawuyts
Copy link
Contributor

yoshuawuyts commented Nov 18, 2019

edit: this is wrong, figured it out in #365 haha

@sebastiencs just tried this out, and it seems this is indeed not possible.

Target program

use async_std::io::{self, BufReader, BufWriter};
use async_std::net::TcpStream;
use async_std::sync::Arc;
use async_std::task;

fn main() -> io::Result<()> {
    task::block_on(async {
        let addr = "localhost:8080";
        let conn = Arc::new(TcpStream::connect(&addr).await?);
        let reader = BufReader::new(conn.clone());
        let writer = BufWriter::new(conn);
        Ok(())
    })
}

async-std impl

I added this to src/net/tcp/stream.rs

impl Read for std::sync::Arc<TcpStream> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        self.watcher.poll_read_with(cx, |mut inner| inner.read(buf))
    }
}

Error

error[E0117]: only traits defined in the current crate can be implemented for arbitrary types
   --> src/net/tcp/stream.rs:367:1
    |
367 | impl Read for std::sync::Arc<TcpStream> {
    | ^^^^^^^^^^^^^^-------------------------
    | |             |
    | |             `std::sync::Arc` is not defined in the current crate
    | impl doesn't use only types from inside the current crate
    |
    = note: define and implement a trait or new type instead
error: aborting due to previous error
For more information about this error, try `rustc --explain E0117`.
error: could not compile `async-std`.
To learn more, run the command again with --verbose.

@yoshuawuyts
Copy link
Contributor

Oh, ugh nevermind. Figured it out; needed to do &* haha. Needed to trigger the deref, and then take ownership again. This works, no extra impls needed:

use async_std::io::{self, BufReader, BufWriter};
use async_std::net::TcpStream;
use async_std::sync::Arc;
use async_std::task;

fn main() -> io::Result<()> {
    task::block_on(async {
        let addr = "localhost:8080";
        let conn = Arc::new(TcpStream::connect(&addr).await?);
        let reader = BufReader::new(&*conn.clone());
        let writer = BufWriter::new(&*conn);
        Ok(())
    })
}

@ghost
Copy link
Author

ghost commented Nov 18, 2019

Indeed, it seems coherence rules don't allow the impl for Arc<TcpStream> :(

@yoshuawuyts BufWriter::new(&*conn) works, but now BufWriter is constrained by the lifetime of this temporary reference, whereas we'd prefer BufWriter to take ownership of the Arc.

@ghost
Copy link
Author

ghost commented Nov 18, 2019

Perhaps we should implement Clone for TcpStream?

let conn = TcpStream::connect(&addr).await?;
let reader = BufReader::new(conn.clone());
let writer = BufWriter::new(conn);

In the standard library we have TcpStream::try_clone(), which calls dup() on its file descriptor. The reason why this method is try_clone() is because dup() might fail.

I believe we could've also had a regular clone() method on std::net::TcpStream, but the standard library chose not to do this because the inner stream would then have to be stored inside an Arc.

However, in async-std, our TcpStream already contains an Arc<Entry> so we probably shouldn't worry about the performance impact of having another Arc. I believe we could even introduce clone() without any adding extra performance penalties.

@yoshuawuyts
Copy link
Contributor

@stjepang ah yeah that'd be great. Filed an issue for it here: #553

@yoshuawuyts
Copy link
Contributor

This has been implemented; only thing left is stabilization now but we should track that separately.

@gsquire
Copy link

gsquire commented Dec 12, 2019

Does this also close #553?

Edit: Oops, I forgot that was split off from the original ask. Disregard.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api design Open design questions question/feedback A question or user feedback
Projects
None yet
Development

No branches or pull requests

4 participants