-
Notifications
You must be signed in to change notification settings - Fork 13.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
I/O streams need to be able to read and write simultaneously #11165
Comments
I am still not entirely convinced that this is the correct way to go about this. I would rather explore other pathways first such as an I/O The good thing about This is definitely a problem that needs to be fixed before 1.0 (so I'm nominating this), but I want to approach this carefully and avoid just blindly implementing a solution which won't extend very well into the future. |
Having |
@alexcrichton are you working on a solution? I would like to help to resolve this issue quickly as my project is blocked by this :) |
Updated title to reflect that the solution isn't known. |
Assigning to P-high. |
I stumbled across the same issue today, while I was starting to implement a websocket library for Rust. I guess this state makes it impossible to implement any protocols that do not follow a I basically see the same possible solutions as you:
|
For those of you looking for a work-around in the mean time, the following seems to work for me: Say you have a TcpStream called use std::sync::arc::UnsafeArc;
let (tcp_send_arc, tcp_recv_arc) = UnsafeArc::new2(tcp); Then, you can use do spawn {
unsafe {
let tcp_recv_ptr = tcp_recv_arc.get();
loop {
// Then you could use the stream like you would, using (*tcp_recv_ptr)
let bytes = (*tcp_recv_ptr).read_to_end();
// ...
}
}
} The thing here is that the underlying |
Having Is this likely to happen? And we are definitely all talking about this select, right? |
The problem with your solution is that you can't close the socket and your connection will persist unless the remote will close the collection. I started yesterday evening a little with experimenting how a But then there's the question about the API design for sockets and other IO. Should the user be able to register for IO readiness and
Another idea would be to return IDs when the async operation is started and What is obvious for me is that when the IoObject get's destructed then it should be automatically deregistered from the |
@derekchiang I'm not convinced that approach is safe. When using libgreen, TcpStream is presumably implemented on top of libuv, and I have no idea whether the libuv data is safe to be accessed from multiple threads. |
@kballard When using libgreen the scheduler that should guarantee that all calls to the libuv streams are from the done from the (single) native thread which owns the associated libuv eventlooop. Therefore I think that the threading is no problem. However the scheduler and libuv wrapper implementations are propably not designed to support parallel requests on the same libuv streams, so it will still be pretty much undefined what happens. |
Just to clarify something, the proposed // Do things up here to setup processor tasks.
let request_processor_ports = [...];
let socket = bind_listen_and_accept();
while true {
let selected = select(socket, request_processor_ports);
if (selected == socket) {
available_request_processor.process(socket.read());
} else {
let msg = selected.read();
socket.write(msg);
}
} Am I misunderstanding the proposal for select? Sorry if I botched the syntax a bit. Edit: as a followup, would there be a way to do this entirely without tasks? |
Basically you won't need extra tasks. You would select directly on the TcpStream somehow.
I have started working on a solution and can tell you what my current API looks like, but it's far from finished. // Create a selector which is a new object
let selector = Selector::new().unwrap();
// First create a TcpStream object.
// This is in my current implementation quite the same as the native I/O ip stream
// - a very thin wrapper on top of the FD. It provides the same read and write methods.
let mut nativeStream = TcpStream::connect(socketaddr).unwrap();
nativeStream.set_blocking(false); // Switch to non-blocking I/O if desired
// This is new. It "upgrades" the TcpStream into a SelectableTcpStream object by consuming it.
// This class features additonal async read/write methods that will be performed by using the
// associated selector.
// I like that approach quite much because if you don't want async I/O you don't need to use it.
let mut selectableStream = stream.associate_selector(&selector);
// By performing selectableStream.disassociate_selector() the original TcpStream can be restored.
// This will only work when no async I/O operation is pending
// Start an async I/O operations.
// In contrast to the existing APIs the operation needs an owned buffer
// in order to manipulate it in the background.
let handle = selectableStream.read_some_async(
ByteBuffer { buffer: ~[0, ..100], offset: 0, length: 20}
);
// handle is either an IoResult<uint>, so either an IoError or a uint as a handle.
// Query the selector for finished I/O when required
let result = selector.wait();
// This blocks until one of the started async operations finishes. It also returns an IoResult<uint>
// which contains in case of success the handle of the operation that finished.
// In case the handles match the result of the async operation can be retrieved
match (handle, result) {
(Ok(h1),Ok(h2)) if h1 == h2 => {
let result = stream.end_read_async();
// This will return either an IoError or the result of the operation which consists of the ByteBuffer
// that was passed at the start of the operation and the number of bytes that were read.
},...
} While an async read is in progress no other sync or async read can be started. So that's basically my current state and what I have to about 60% implemented using native I/O on Linux. What I like is the association and disassociation of the let pending_op= selectableStream.read_some_async(
ByteBuffer { buffer: ~[0, ..100], offset: 0, length: 20}
).unwrap();
let result = selector.wait().unwrap();
if (result.pending_op == pending_op) {
let read_result = pending_op.end_read_async();
} So calling the end methods directly on the handle that is returned. But returning complex handles propably leads to very complex lifetime semantics. Some things that are also not that easy to decide and to implement are what should happen if the the user closes the socket or if get's destructed. -> Should the selector still return the aborted op or not |
@Matthias247, that sounds promising! I'm not quite understanding how everything quite works right now, and I'm not sure how async reads play into all this (and how they work in general). For me, I would start designing all this with an understanding of what primitives we actually have to deal with:
That's what I know of, I don't really know of what windows has to offer on this front. Do you know of any good docs I can read up about windows apis? I very much like the idea of a I don't think that you necessarily need to consume the I/O handle but rather just take a mutable loan on it to prevent further usage. You would regain usage when you drop the select handle returned to you. Regardless, something along these lines sound like a good idea. All in all, this sounds really promising! |
I would really like the ability to select on both I/O and Chans simultaneously. I also need to be able to wrap @Matthias247 Assuming your proposal does not allow for selecting on Chans as well, and it doesn't appear to, then the only alternative is to spawn an extra task in order to provide a Chan-based interface to the stream. This is less than ideal, especially when using libnative. And I need to be able to buffer the stream because I need a line-based interface and that's what |
@alexcrichton could you explain how Edit: initial comment about being generic was here: #11165 (comment) |
@kballard selecting over I/O and chans is pretty difficult, the only reasonable solution I know of is to convert a @sw17ch You'd only need to implement |
@alexcrichton Cocoa's |
Something like |
@alexcrichton What do you mean? Multiple sources in |
@alexcrichton If libnative has good async-capable I/O implementation that could be the basis also for the libgreen main threads instead of using uv. However that's propably a far future vision. @kballard Of course the target is to get this also running on Channels. And on Timers. Propably not on files. |
@sw17ch What I like better about the other solution is that the split TcpStream still does not solve things like receiving and listening to a timer in parallel. |
Ah, good point. It would be nice to have a better 'event' API in general, I suppose. |
In the meanwhile I'm one step further. I then had another idea that I decided to try. It's basically a quite old (aka rusty) concept, which is basically a mixture of of what QT or AS3 networking APIs or the Windows wndproc look like. But as code says more than 1000 words I simply start with an example that I have running (on Linux): fn main() {
// Create an event queue. That's the central element instead of Select
let mut ev_queue = EventQueue::new();
// Create a port/chan pair. Same as rust std API, but different in implementation
let (port,chan): (Port<~str>, Chan<~str>) = Chan::new();
// Upgrade the port to a selectable port. Still need another name for that
let mut selport = SelectablePort::from_port(port, &ev_queue);
// Create an event based timer
let mut main_timer = Timer::create(&ev_queue).unwrap();
main_timer.set_interval(2000);
main_timer.start();
// Start a subtask that communicates with us
do native::task::spawn() {
subtask(chan);
}
loop {
// Wait for events to arrive
let event = ev_queue.next_event().unwrap();
// Look for the origin and the type and then check what to do
if event.originates_from(selport) {
match event.event_type {
event::ChannelMessageEvent => {
// We know that we received a message and can fetch it in a nonblocking style
let msg = selport.recv().unwrap();
println!("Message from subtask: {}", msg);
},
event::ChannelClosedEvent => {
// Oh, the channel seams dead now
println!("Subtask closed");
return;
},
_ => ()
}
}
else if event.originates_from(main_timer) {
// The timer send a tick event
println!("main_timer::tick()");
}
}
}
fn subtask(chan: Chan<~str>) {
// The other task also gets an event queue and a timer
let mut ev_queue = EventQueue::new();
let mut sub_timer = Timer::create(&ev_queue).unwrap();
let mut iterations = 3;
let mut stream_alive = false;
// We will play with TCP here, so let's connect to somewhere. This is currently blocking
let opt_ipaddr:Option<IpAddr> = FromStr::from_str("192.168.1.99");
let socketaddr = SocketAddr {ip: opt_ipaddr.unwrap(), port: 8000};
let mut rawstream = TcpStream::connect(socketaddr).unwrap();
let mut stream = SelectableTcpStream::from_tcp_stream(rawstream, &ev_queue);
stream_alive = true;
// Start a timer
sub_timer.set_interval(3000);
sub_timer.start();
// Send a request. This is also currently blocking
let request = ~"GET /index.html HTTP/1.1\r\n\r\n";
stream.write(request.as_bytes());
iterations -= 1;
loop {
// Fetch events and checkout what to do
let event = ev_queue.next_event().unwrap();
if event.originates_from(sub_timer) {
// Send something to the mainthread.
chan.send(~"subtimer::tick()");
if !stream_alive {
if iterations > 0 {
iterations -= 1;
// Create a new stream. The old one wil be killed through RAII here
rawstream = TcpStream::connect(socketaddr).unwrap();
stream = SelectableTcpStream::from_tcp_stream(rawstream, &ev_queue);
stream_alive = true;
stream.write(request.as_bytes());
}
else {
return;
}
}
}
else if event.originates_from(stream) {
match event.event_type {
event::StreamClosedEvent => {
// Oops, the TCP connection was closed by the remote
chan.send(~"TCP connection closed");
stream_alive = false;
},
event::DataAvailableEvent(nr_bytes) => {
// Yay, we know that we received at least nr_bytes and can "safely" read them in a nonblocking fashion
let mut buffer: ~[u8] = std::vec::from_elem::<u8>(nr_bytes, 0);
let read_res = stream.read(buffer);
match read_res {
Err(err) => {
chan.send(err.desc.to_owned());
}
Ok(nr_read) => {
let txt = std::str::from_utf8(buffer.slice(0, nr_read));
chan.send(txt.to_owned());
}
}
},
_ => ()
}
}
}
// Stop my IO performing objects. This will also by done in RAII style if you forget it.
sub_timer.stop();
stream.close();
} Ooh, and of course the glorious output:
|
Ok, so after the code example some more info about it. The conceptAs you can see the whole asynchronous functionality is build around events which are dispatched from an event queue. The events notifiy the user that something has happened, is ready or was finished. An event is something quite simply and lightweight: pub struct Event
{
event_type: EventKind, // The type of the event which occured and all necessary info
is_valid: bool, // Internally used
source: Rc<bool> // source of the event. Type is not relevant for the user
}
// All types of events that are known. This is what I used in the examples, but there would be much more
// Rusts enums are really a great way to describe different kinds of events and their associated data
pub enum EventKind
{
StreamClosedEvent,
IoErrorEvent(IoError),
DataAvailableEvent(uint),
TimerEvent,
ChannelClosedEvent,
ChannelMessageEvent,
...,
SignalReceived(uint),
DnsQueryResolved(),
PacketReceived(uint)
} That makes the API somewhat similar to the pub struct Event
{
...
callback_fn: fn(ev: &Event),
user_data: ~Any
} Then you could build a callback style eventloop as simple as loop {
// Wait for events to arrive
let event = ev_queue.next_event().unwrap();
event.callback_fn(event);
} Things that are also relevant to me:
|
The implementationI currently only have a Linux 1:1 implementation, but also have some ideas of how it works in other environments. The hardest thing to get right are the sockets. The APIs differ here very much. I decided to give a high level view of what happend to the user, e.g. directly if and how many bytes are readable, if the connection was closed or what error occured. Might not be the highest performant, but all OSes can provide the information in some way and it gives an easy to understand API to the user.
A question that also arises from the last one is what should happen if the application calls
I also decided to automatically close connections after the first error, so that users don't get further spurious errors. Writes are currently blocking because that's the easiest thing to implement. You must not care about how to hold the data-to-send alive until a background write would be finished. That's propably suboptimal for applications which send lot's of data (which would cause the socket buffer to fill and thereby block). And also suboptimal if you would use it as a base for a M:N scheduler.
|
This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
@Matthias247 @dwrensha @kballard I also like the single-reader-on-socket model, but only with green threads. Does the current API require the use of a native thread for the socket? I didn't read that into the implementation, but didn't look too hard. Is there something about the current model that doesn't behave nicely (other than the possibility of leaking the socket)? |
@sw17ch The current implementation does not require native threads for sockets. When using libgreen, sockets will go through libuv. The problem is that if you're writing library code, it can't control which threading model it's invoked in, and depending on your API you may not have the luxury of spinning up a libgreen thread pool for your socket tasks. |
@kballard ah, thank you. This clears things up for me a lot. |
Two new methods were added to TcpStream and UnixStream: fn close_read(&mut self) -> IoResult<()>; fn close_write(&mut self) -> IoResult<()>; These two methods map to shutdown()'s behavior (the system call on unix), closing the reading or writing half of a duplex stream. These methods are primarily added to allow waking up a pending read in another task. By closing the reading half of a connection, all pending readers will be woken up and will return with EndOfFile. The close_write() method was added for symmetry with close_read(), and I imagine that it will be quite useful at some point. Implementation-wise, librustuv got the short end of the stick this time. The native versions just delegate to the shutdown() syscall (easy). The uv versions can leverage uv_shutdown() for tcp/unix streams, but only for closing the writing half. Closing the reading half is done through some careful dancing to wake up a pending reader. cc rust-lang#11165
Two new methods were added to TcpStream and UnixStream: fn close_read(&mut self) -> IoResult<()>; fn close_write(&mut self) -> IoResult<()>; These two methods map to shutdown()'s behavior (the system call on unix), closing the reading or writing half of a duplex stream. These methods are primarily added to allow waking up a pending read in another task. By closing the reading half of a connection, all pending readers will be woken up and will return with EndOfFile. The close_write() method was added for symmetry with close_read(), and I imagine that it will be quite useful at some point. Implementation-wise, librustuv got the short end of the stick this time. The native versions just delegate to the shutdown() syscall (easy). The uv versions can leverage uv_shutdown() for tcp/unix streams, but only for closing the writing half. Closing the reading half is done through some careful dancing to wake up a pending reader. As usual, windows likes to be different from unix. The windows implementation uses shutdown() for sockets, but shutdown() is not available for named pipes. Instead, CancelIoEx was used with same fancy synchronization to make sure everyone knows what's up. cc #11165
Two new methods were added to TcpStream and UnixStream: fn close_read(&mut self) -> IoResult<()>; fn close_write(&mut self) -> IoResult<()>; These two methods map to shutdown()'s behavior (the system call on unix), closing the reading or writing half of a duplex stream. These methods are primarily added to allow waking up a pending read in another task. By closing the reading half of a connection, all pending readers will be woken up and will return with EndOfFile. The close_write() method was added for symmetry with close_read(), and I imagine that it will be quite useful at some point. Implementation-wise, librustuv got the short end of the stick this time. The native versions just delegate to the shutdown() syscall (easy). The uv versions can leverage uv_shutdown() for tcp/unix streams, but only for closing the writing half. Closing the reading half is done through some careful dancing to wake up a pending reader. As usual, windows likes to be different from unix. The windows implementation uses shutdown() for sockets, but shutdown() is not available for named pipes. Instead, CancelIoEx was used with same fancy synchronization to make sure everyone knows what's up. cc rust-lang#11165
Two new methods were added to TcpStream and UnixStream: fn close_read(&mut self) -> IoResult<()>; fn close_write(&mut self) -> IoResult<()>; These two methods map to shutdown()'s behavior (the system call on unix), closing the reading or writing half of a duplex stream. These methods are primarily added to allow waking up a pending read in another task. By closing the reading half of a connection, all pending readers will be woken up and will return with EndOfFile. The close_write() method was added for symmetry with close_read(), and I imagine that it will be quite useful at some point. Implementation-wise, librustuv got the short end of the stick this time. The native versions just delegate to the shutdown() syscall (easy). The uv versions can leverage uv_shutdown() for tcp/unix streams, but only for closing the writing half. Closing the reading half is done through some careful dancing to wake up a pending reader. As usual, windows likes to be different from unix. The windows implementation uses shutdown() for sockets, but shutdown() is not available for named pipes. Instead, CancelIoEx was used with same fancy synchronization to make sure everyone knows what's up. cc #11165
Two new methods were added to TcpStream and UnixStream: fn close_read(&mut self) -> IoResult<()>; fn close_write(&mut self) -> IoResult<()>; These two methods map to shutdown()'s behavior (the system call on unix), closing the reading or writing half of a duplex stream. These methods are primarily added to allow waking up a pending read in another task. By closing the reading half of a connection, all pending readers will be woken up and will return with EndOfFile. The close_write() method was added for symmetry with close_read(), and I imagine that it will be quite useful at some point. Implementation-wise, librustuv got the short end of the stick this time. The native versions just delegate to the shutdown() syscall (easy). The uv versions can leverage uv_shutdown() for tcp/unix streams, but only for closing the writing half. Closing the reading half is done through some careful dancing to wake up a pending reader. As usual, windows likes to be different from unix. The windows implementation uses shutdown() for sockets, but shutdown() is not available for named pipes. Instead, CancelIoEx was used with same fancy synchronization to make sure everyone knows what's up. cc rust-lang#11165
Two new methods were added to TcpStream and UnixStream: fn close_read(&mut self) -> IoResult<()>; fn close_write(&mut self) -> IoResult<()>; These two methods map to shutdown()'s behavior (the system call on unix), closing the reading or writing half of a duplex stream. These methods are primarily added to allow waking up a pending read in another task. By closing the reading half of a connection, all pending readers will be woken up and will return with EndOfFile. The close_write() method was added for symmetry with close_read(), and I imagine that it will be quite useful at some point. Implementation-wise, librustuv got the short end of the stick this time. The native versions just delegate to the shutdown() syscall (easy). The uv versions can leverage uv_shutdown() for tcp/unix streams, but only for closing the writing half. Closing the reading half is done through some careful dancing to wake up a pending reader. As usual, windows likes to be different from unix. The windows implementation uses shutdown() for sockets, but shutdown() is not available for named pipes. Instead, CancelIoEx was used with same fancy synchronization to make sure everyone knows what's up. cc #11165
Nominating for closure. We're in a much better state than when this was opened:
I believe that this set of functionality encompasses the intent of this issue. |
@alexcrichton I believe that what you have listed does meet the original intent of this issue. But besides the ability to read/write simultaneously, there's a need for the ability to select over multiple I/O channels at once (or over the reading and writing halves of the same channel). Perhaps that should be covered under a separate ticket, but there was a lot of discussion in here already about doing just that. |
Closing! Yay! |
Two new methods were added to TcpStream and UnixStream: fn close_read(&mut self) -> IoResult<()>; fn close_write(&mut self) -> IoResult<()>; These two methods map to shutdown()'s behavior (the system call on unix), closing the reading or writing half of a duplex stream. These methods are primarily added to allow waking up a pending read in another task. By closing the reading half of a connection, all pending readers will be woken up and will return with EndOfFile. The close_write() method was added for symmetry with close_read(), and I imagine that it will be quite useful at some point. Implementation-wise, librustuv got the short end of the stick this time. The native versions just delegate to the shutdown() syscall (easy). The uv versions can leverage uv_shutdown() for tcp/unix streams, but only for closing the writing half. Closing the reading half is done through some careful dancing to wake up a pending reader. As usual, windows likes to be different from unix. The windows implementation uses shutdown() for sockets, but shutdown() is not available for named pipes. Instead, CancelIoEx was used with same fancy synchronization to make sure everyone knows what's up. cc rust-lang#11165
👏 |
[`len_without_is_empty`]: follow type alias to find inherent `is_empty` method Fixes rust-lang#11165 When we see an `impl B` and `B` is a type alias to some type `A`, then we need to follow the type alias to look for an `is_empty` method on the aliased type `A`. Before this PR, it'd get the inherent impls of `B`, which there aren't any and so it would warn that there isn't an `is_empty` method even if there was one. Passing the type alias `DefId` to `TyCtxt::type_of` gives us the aliased `DefId` (or simply return the type itself if it wasn't a type alias) so we can just use that changelog: [`len_without_is_empty`]: follow type alias to find inherent `is_empty` method
TcpStream
is conceptually two distinct parts: a reader and a writer. In theory you should be able to write data while in the middle of being blocked trying to read. But aTcpStream
cannot be shared between tasks, so there is no way to write code to do this.To that end,
TcpStream
should have asplit()
method that returns(TcpWriter, TcpReader)
which each implement one half of theTcpStream
. This would allow each half to be sent to a separate task.One practical reason for this is to be able to wrap a
TcpStream
in a pair of(Port<~[u8], Chan<~[u8]>)
s. This requires a split stream and two tasks.The text was updated successfully, but these errors were encountered: