Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

std: Add close_{read,write}() methods to I/O #13743

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/liblibc/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ pub use consts::os::bsd44::{SOL_SOCKET, SO_KEEPALIVE, SO_ERROR};
pub use consts::os::bsd44::{SO_REUSEADDR, SO_BROADCAST, SHUT_WR, IP_MULTICAST_LOOP};
pub use consts::os::bsd44::{IP_ADD_MEMBERSHIP, IP_DROP_MEMBERSHIP};
pub use consts::os::bsd44::{IPV6_ADD_MEMBERSHIP, IPV6_DROP_MEMBERSHIP};
pub use consts::os::bsd44::{IP_MULTICAST_TTL, IP_TTL};
pub use consts::os::bsd44::{IP_MULTICAST_TTL, IP_TTL, SHUT_RD};

pub use funcs::c95::ctype::{isalnum, isalpha, iscntrl, isdigit};
pub use funcs::c95::ctype::{islower, isprint, ispunct, isspace};
Expand Down
13 changes: 10 additions & 3 deletions src/libnative/io/file_unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@

//! Blocking posix-based file I/O

use std::sync::arc::UnsafeArc;
use libc::{c_int, c_void};
use libc;
use std::c_str::CString;
use std::io::IoError;
use std::io;
use libc::{c_int, c_void};
use libc;
use std::mem;
use std::rt::rtio;
use std::sync::arc::UnsafeArc;

use io::{IoResult, retry, keep_going};

Expand Down Expand Up @@ -178,6 +178,13 @@ impl rtio::RtioPipe for FileDesc {
fn clone(&self) -> ~rtio::RtioPipe:Send {
~FileDesc { inner: self.inner.clone() } as ~rtio::RtioPipe:Send
}

fn close_write(&mut self) -> IoResult<()> {
super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
}
fn close_read(&mut self) -> IoResult<()> {
super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
}
}

impl rtio::RtioTTY for FileDesc {
Expand Down
7 changes: 4 additions & 3 deletions src/libnative/io/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,10 @@ impl rtio::RtioTcpStream for TcpStream {
~TcpStream { inner: self.inner.clone() } as ~rtio::RtioTcpStream:Send
}
fn close_write(&mut self) -> IoResult<()> {
super::mkerr_libc(unsafe {
libc::shutdown(self.fd(), libc::SHUT_WR)
})
super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
}
fn close_read(&mut self) -> IoResult<()> {
super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/libnative/io/pipe_unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ impl rtio::RtioPipe for UnixStream {
fn clone(&self) -> ~rtio::RtioPipe:Send {
~UnixStream { inner: self.inner.clone() } as ~rtio::RtioPipe:Send
}

fn close_write(&mut self) -> IoResult<()> {
super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
}
fn close_read(&mut self) -> IoResult<()> {
super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
}
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
19 changes: 19 additions & 0 deletions src/librustuv/access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct Guard<'a> {
struct Inner {
queue: Vec<BlockedTask>,
held: bool,
closed: bool,
}

impl Access {
Expand All @@ -41,6 +42,7 @@ impl Access {
inner: UnsafeArc::new(Inner {
queue: vec![],
held: false,
closed: false,
})
}
}
Expand All @@ -64,6 +66,15 @@ impl Access {

Guard { access: self, missile: Some(missile) }
}

pub fn close(&self, _missile: &HomingMissile) {
// This unsafety is OK because with a homing missile we're guaranteed to
// be the only task looking at the `closed` flag (and are therefore
// allowed to modify it). Additionally, no atomics are necessary because
// everyone's running on the same thread and has already done the
// necessary synchronization to be running on this thread.
unsafe { (*self.inner.get()).closed = true; }
}
}

impl Clone for Access {
Expand All @@ -72,6 +83,14 @@ impl Clone for Access {
}
}

impl<'a> Guard<'a> {
pub fn is_closed(&self) -> bool {
// See above for why this unsafety is ok, it just applies to the read
// instead of the write.
unsafe { (*self.access.inner.get()).closed }
}
}

#[unsafe_destructor]
impl<'a> Drop for Guard<'a> {
fn drop(&mut self) {
Expand Down
104 changes: 59 additions & 45 deletions src/librustuv/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use libc::{size_t, ssize_t, c_int, c_void, c_uint};
use libc;
use std::cast;
use std::io;
use std::io::{IoError, IoResult};
use std::io::net::ip;
use libc::{size_t, ssize_t, c_int, c_void, c_uint};
use libc;
use std::mem;
use std::ptr;
use std::rt::rtio;
Expand Down Expand Up @@ -411,7 +412,13 @@ impl rtio::RtioSocket for TcpWatcher {
impl rtio::RtioTcpStream for TcpWatcher {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
let m = self.fire_homing_missile();
let _g = self.read_access.grant(m);
let access = self.read_access.grant(m);

// see comments in close_read about this check
if access.is_closed() {
return Err(io::standard_error(io::EndOfFile))
}

self.stream.read(buf).map_err(uv_error_to_io_error)
}

Expand Down Expand Up @@ -466,36 +473,17 @@ impl rtio::RtioTcpStream for TcpWatcher {
} as ~rtio::RtioTcpStream:Send
}

fn close_write(&mut self) -> Result<(), IoError> {
struct Ctx {
slot: Option<BlockedTask>,
status: c_int,
}
let mut req = Request::new(uvll::UV_SHUTDOWN);

return match unsafe {
uvll::uv_shutdown(req.handle, self.handle, shutdown_cb)
} {
0 => {
req.defuse(); // uv callback now owns this request
let mut cx = Ctx { slot: None, status: 0 };

wait_until_woken_after(&mut cx.slot, &self.uv_loop(), || {
req.set_data(&cx);
});

status_to_io_result(cx.status)
}
n => Err(uv_error_to_io_error(UvError(n)))
};
fn close_read(&mut self) -> Result<(), IoError> {
// see comments in PipeWatcher::close_read
let m = self.fire_homing_missile();
self.read_access.close(&m);
self.stream.cancel_read(m);
Ok(())
}

extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) {
let req = Request::wrap(req);
assert!(status != uvll::ECANCELED);
let cx: &mut Ctx = unsafe { req.get_data() };
cx.status = status;
wakeup(&mut cx.slot);
}
fn close_write(&mut self) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
shutdown(self.handle, &self.uv_loop())
}
}

Expand Down Expand Up @@ -704,7 +692,7 @@ impl rtio::RtioUdpSocket for UdpWatcher {
let m = self.fire_homing_missile();
let _g = self.read_access.grant(m);

let a = match unsafe {
return match unsafe {
uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
} {
0 => {
Expand All @@ -725,23 +713,21 @@ impl rtio::RtioUdpSocket for UdpWatcher {
}
n => Err(uv_error_to_io_error(UvError(n)))
};
return a;

extern fn alloc_cb(handle: *uvll::uv_udp_t,
_suggested_size: size_t,
buf: *mut Buf) {
unsafe {
let cx: &mut Ctx =
cast::transmute(uvll::get_data_for_uv_handle(handle));
let cx = &mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx);
*buf = cx.buf.take().expect("recv alloc_cb called more than once")
}
}

extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: *Buf,
addr: *libc::sockaddr, _flags: c_uint) {
assert!(nread != uvll::ECANCELED as ssize_t);
let cx: &mut Ctx = unsafe {
cast::transmute(uvll::get_data_for_uv_handle(handle))
let cx = unsafe {
&mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx)
};

// When there's no data to read the recv callback can be a no-op.
Expand All @@ -752,13 +738,7 @@ impl rtio::RtioUdpSocket for UdpWatcher {
return
}

unsafe {
assert_eq!(uvll::uv_udp_recv_stop(handle), 0)
}

let cx: &mut Ctx = unsafe {
cast::transmute(uvll::get_data_for_uv_handle(handle))
};
unsafe { assert_eq!(uvll::uv_udp_recv_stop(handle), 0) }
let addr = if addr == ptr::null() {
None
} else {
Expand Down Expand Up @@ -900,6 +880,40 @@ impl Drop for UdpWatcher {
}
}

////////////////////////////////////////////////////////////////////////////////
// Shutdown helper
////////////////////////////////////////////////////////////////////////////////

pub fn shutdown(handle: *uvll::uv_stream_t, loop_: &Loop) -> Result<(), IoError> {
struct Ctx {
slot: Option<BlockedTask>,
status: c_int,
}
let mut req = Request::new(uvll::UV_SHUTDOWN);

return match unsafe { uvll::uv_shutdown(req.handle, handle, shutdown_cb) } {
0 => {
req.defuse(); // uv callback now owns this request
let mut cx = Ctx { slot: None, status: 0 };

wait_until_woken_after(&mut cx.slot, loop_, || {
req.set_data(&cx);
});

status_to_io_result(cx.status)
}
n => Err(uv_error_to_io_error(UvError(n)))
};

extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) {
let req = Request::wrap(req);
assert!(status != uvll::ECANCELED);
let cx: &mut Ctx = unsafe { req.get_data() };
cx.status = status;
wakeup(&mut cx.slot);
}
}

#[cfg(test)]
mod test {
use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
Expand Down
38 changes: 37 additions & 1 deletion src/librustuv/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

use std::c_str::CString;
use std::io::IoError;
use std::io;
use libc;
use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};

Expand Down Expand Up @@ -111,7 +112,13 @@ impl PipeWatcher {
impl RtioPipe for PipeWatcher {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
let m = self.fire_homing_missile();
let _g = self.read_access.grant(m);
let access = self.read_access.grant(m);

// see comments in close_read about this check
if access.is_closed() {
return Err(io::standard_error(io::EndOfFile))
}

self.stream.read(buf).map_err(uv_error_to_io_error)
}

Expand All @@ -131,6 +138,35 @@ impl RtioPipe for PipeWatcher {
write_access: self.write_access.clone(),
} as ~RtioPipe:Send
}

fn close_read(&mut self) -> Result<(), IoError> {
// The current uv_shutdown method only shuts the writing half of the
// connection, and no method is provided to shut down the reading half
// of the connection. With a lack of method, we emulate shutting down
// the reading half of the connection by manually returning early from
// all future calls to `read`.
//
// Note that we must be careful to ensure that *all* cloned handles see
// the closing of the read half, so we stored the "is closed" bit in the
// Access struct, not in our own personal watcher. Additionally, the
// homing missile is used as a locking mechanism to ensure there is no
// contention over this bit.
//
// To shutdown the read half, we must first flag the access as being
// closed, and then afterwards we cease any pending read. Note that this
// ordering is crucial because we could in theory be rescheduled during
// the uv_read_stop which means that another read invocation could leak
// in before we set the flag.
let m = self.fire_homing_missile();
self.read_access.close(&m);
self.stream.cancel_read(m);
Ok(())
}

fn close_write(&mut self) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
net::shutdown(self.stream.handle, &self.uv_loop())
}
}

impl HomingIO for PipeWatcher {
Expand Down
Loading