Skip to content

Commit

Permalink
Add Send/Sync bounds to many generic concurrent types.
Browse files Browse the repository at this point in the history
These types are purely designed for concurrent code, and, at the moment,
are restricted to be used in those situations. This solidifies that goal
by imposing a strict restriction on the generic types from the start,
i.e. in the definition itself.

This is the opposite to rust-lang#23176 which relaxes the bounds entirely (it is
backwards compatible to switch to that more flexible approach).

Unfortunately the message-passing primitives in std (the return value
from a thread, and sync::mpsc) aren't great about how they work with
mutability and sharing, and so require hacky error-prone `unsafe impl`s.
However, they are purely implementation details: the interface isn't
affected by having to make that internal change, and clean-ups to the
code should be able to remove the hacks.
  • Loading branch information
huonw committed Apr 1, 2015
1 parent d528aa9 commit 42094f3
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 50 deletions.
40 changes: 20 additions & 20 deletions src/liballoc/arc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ use heap::deallocate;
/// ```
#[unsafe_no_drop_flag]
#[stable(feature = "rust1", since = "1.0.0")]
pub struct Arc<T> {
pub struct Arc<T: Send + Sync> {
// FIXME #12808: strange name to try to avoid interfering with
// field accesses of the contained type via Deref
_ptr: NonZero<*mut ArcInner<T>>,
Expand All @@ -136,7 +136,7 @@ unsafe impl<T: Sync + Send> Sync for Arc<T> { }
#[unsafe_no_drop_flag]
#[unstable(feature = "alloc",
reason = "Weak pointers may not belong in this module.")]
pub struct Weak<T> {
pub struct Weak<T: Send + Sync> {
// FIXME #12808: strange name to try to avoid interfering with
// field accesses of the contained type via Deref
_ptr: NonZero<*mut ArcInner<T>>,
Expand All @@ -146,13 +146,13 @@ unsafe impl<T: Sync + Send> Send for Weak<T> { }
unsafe impl<T: Sync + Send> Sync for Weak<T> { }

#[stable(feature = "rust1", since = "1.0.0")]
impl<T: fmt::Debug> fmt::Debug for Weak<T> {
impl<T: fmt::Debug + Send + Sync> fmt::Debug for Weak<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "(Weak)")
}
}

struct ArcInner<T> {
struct ArcInner<T: Send + Sync> {
strong: atomic::AtomicUsize,
weak: atomic::AtomicUsize,
data: T,
Expand All @@ -161,7 +161,7 @@ struct ArcInner<T> {
unsafe impl<T: Sync + Send> Send for ArcInner<T> {}
unsafe impl<T: Sync + Send> Sync for ArcInner<T> {}

impl<T> Arc<T> {
impl<T: Send + Sync> Arc<T> {
/// Constructs a new `Arc<T>`.
///
/// # Examples
Expand Down Expand Up @@ -205,7 +205,7 @@ impl<T> Arc<T> {
}
}

impl<T> Arc<T> {
impl<T: Send + Sync> Arc<T> {
#[inline]
fn inner(&self) -> &ArcInner<T> {
// This unsafety is ok because while this arc is alive we're guaranteed
Expand Down Expand Up @@ -235,15 +235,15 @@ impl<T> Arc<T> {
/// Get the number of weak references to this value.
#[inline]
#[unstable(feature = "alloc")]
pub fn weak_count<T>(this: &Arc<T>) -> usize { this.inner().weak.load(SeqCst) - 1 }
pub fn weak_count<T: Send + Sync>(this: &Arc<T>) -> usize { this.inner().weak.load(SeqCst) - 1 }

/// Get the number of strong references to this value.
#[inline]
#[unstable(feature = "alloc")]
pub fn strong_count<T>(this: &Arc<T>) -> usize { this.inner().strong.load(SeqCst) }
pub fn strong_count<T: Send + Sync>(this: &Arc<T>) -> usize { this.inner().strong.load(SeqCst) }

#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Clone for Arc<T> {
impl<T: Send + Sync> Clone for Arc<T> {
/// Makes a clone of the `Arc<T>`.
///
/// This increases the strong reference count.
Expand Down Expand Up @@ -277,7 +277,7 @@ impl<T> Clone for Arc<T> {
}

#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Deref for Arc<T> {
impl<T: Send + Sync> Deref for Arc<T> {
type Target = T;

#[inline]
Expand Down Expand Up @@ -324,7 +324,7 @@ impl<T: Send + Sync + Clone> Arc<T> {

#[unsafe_destructor]
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Drop for Arc<T> {
impl<T: Send + Sync> Drop for Arc<T> {
/// Drops the `Arc<T>`.
///
/// This will decrement the strong reference count. If the strong reference
Expand Down Expand Up @@ -392,7 +392,7 @@ impl<T> Drop for Arc<T> {

#[unstable(feature = "alloc",
reason = "Weak pointers may not belong in this module.")]
impl<T> Weak<T> {
impl<T: Send + Sync> Weak<T> {
/// Upgrades a weak reference to a strong reference.
///
/// Upgrades the `Weak<T>` reference to an `Arc<T>`, if possible.
Expand Down Expand Up @@ -458,7 +458,7 @@ impl<T: Sync + Send> Clone for Weak<T> {

#[unsafe_destructor]
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Drop for Weak<T> {
impl<T: Send + Sync> Drop for Weak<T> {
/// Drops the `Weak<T>`.
///
/// This will decrement the weak reference count.
Expand Down Expand Up @@ -503,7 +503,7 @@ impl<T> Drop for Weak<T> {
}

#[stable(feature = "rust1", since = "1.0.0")]
impl<T: PartialEq> PartialEq for Arc<T> {
impl<T: PartialEq + Send + Sync> PartialEq for Arc<T> {
/// Equality for two `Arc<T>`s.
///
/// Two `Arc<T>`s are equal if their inner value are equal.
Expand Down Expand Up @@ -535,7 +535,7 @@ impl<T: PartialEq> PartialEq for Arc<T> {
fn ne(&self, other: &Arc<T>) -> bool { *(*self) != *(*other) }
}
#[stable(feature = "rust1", since = "1.0.0")]
impl<T: PartialOrd> PartialOrd for Arc<T> {
impl<T: PartialOrd + Send + Sync> PartialOrd for Arc<T> {
/// Partial comparison for two `Arc<T>`s.
///
/// The two are compared by calling `partial_cmp()` on their inner values.
Expand Down Expand Up @@ -614,21 +614,21 @@ impl<T: PartialOrd> PartialOrd for Arc<T> {
fn ge(&self, other: &Arc<T>) -> bool { *(*self) >= *(*other) }
}
#[stable(feature = "rust1", since = "1.0.0")]
impl<T: Ord> Ord for Arc<T> {
impl<T: Ord + Send + Sync> Ord for Arc<T> {
fn cmp(&self, other: &Arc<T>) -> Ordering { (**self).cmp(&**other) }
}
#[stable(feature = "rust1", since = "1.0.0")]
impl<T: Eq> Eq for Arc<T> {}
impl<T: Eq + Send + Sync> Eq for Arc<T> {}

#[stable(feature = "rust1", since = "1.0.0")]
impl<T: fmt::Display> fmt::Display for Arc<T> {
impl<T: fmt::Display + Send + Sync> fmt::Display for Arc<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(&**self, f)
}
}

#[stable(feature = "rust1", since = "1.0.0")]
impl<T: fmt::Debug> fmt::Debug for Arc<T> {
impl<T: fmt::Debug + Send + Sync> fmt::Debug for Arc<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
Expand All @@ -641,7 +641,7 @@ impl<T: Default + Sync + Send> Default for Arc<T> {
}

#[stable(feature = "rust1", since = "1.0.0")]
impl<T: Hash> Hash for Arc<T> {
impl<T: Hash + Send + Sync> Hash for Arc<T> {
fn hash<H: Hasher>(&self, state: &mut H) {
(**self).hash(state)
}
Expand Down
4 changes: 2 additions & 2 deletions src/libcollections/borrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use core::clone::Clone;
use core::cmp::{Eq, Ord, Ordering, PartialEq, PartialOrd};
use core::convert::AsRef;
use core::hash::{Hash, Hasher};
use core::marker::Sized;
use core::marker::{Send, Sized, Sync};
use core::ops::Deref;
use core::option::Option;

Expand Down Expand Up @@ -115,7 +115,7 @@ impl<T> Borrow<T> for rc::Rc<T> {
fn borrow(&self) -> &T { &**self }
}

impl<T> Borrow<T> for arc::Arc<T> {
impl<T: Send + Sync> Borrow<T> for arc::Arc<T> {
fn borrow(&self) -> &T { &**self }
}

Expand Down
2 changes: 1 addition & 1 deletion src/libserialize/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ impl<T: Decodable> Decodable for RefCell<T> {
}
}

impl<T:Encodable> Encodable for Arc<T> {
impl<T:Sync+Send+Encodable> Encodable for Arc<T> {
fn encode<S: Encoder>(&self, s: &mut S) -> Result<(), S::Error> {
(**self).encode(s)
}
Expand Down
4 changes: 2 additions & 2 deletions src/libstd/io/lazy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ use cell::UnsafeCell;
use rt;
use sync::{StaticMutex, Arc};

pub struct Lazy<T> {
pub struct Lazy<T: Send + Sync> {
pub lock: StaticMutex,
pub ptr: UnsafeCell<*mut Arc<T>>,
pub init: fn() -> Arc<T>,
}

unsafe impl<T> Sync for Lazy<T> {}
unsafe impl<T: Send + Sync> Sync for Lazy<T> {}

macro_rules! lazy_init {
($init:expr) => (::io::lazy::Lazy {
Expand Down
50 changes: 35 additions & 15 deletions src/libstd/sync/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,12 +373,10 @@ unsafe impl<T: Send> Send for Sender<T> { }
/// owned by one task, but it can be cloned to send to other tasks.
#[stable(feature = "rust1", since = "1.0.0")]
pub struct SyncSender<T: Send> {
inner: Arc<UnsafeCell<sync::Packet<T>>>,
inner: Arc<SyncUnsafeCell<sync::Packet<T>>>,
}

unsafe impl<T: Send> Send for SyncSender<T> {}

impl<T> !Sync for SyncSender<T> {}
impl<T: Send> !Sync for SyncSender<T> {}

/// An error returned from the `send` function on channels.
///
Expand Down Expand Up @@ -434,11 +432,33 @@ pub enum TrySendError<T> {
}

enum Flavor<T:Send> {
Oneshot(Arc<UnsafeCell<oneshot::Packet<T>>>),
Stream(Arc<UnsafeCell<stream::Packet<T>>>),
Shared(Arc<UnsafeCell<shared::Packet<T>>>),
Sync(Arc<UnsafeCell<sync::Packet<T>>>),
Oneshot(Arc<SyncUnsafeCell<oneshot::Packet<T>>>),
Stream(Arc<SyncUnsafeCell<stream::Packet<T>>>),
Shared(Arc<SyncUnsafeCell<shared::Packet<T>>>),
Sync(Arc<SyncUnsafeCell<sync::Packet<T>>>),
}

// the channel impls are free and fast with mutability and sharing, so
// we have to hack around this with this type.
struct SyncUnsafeCell<T> {
data: UnsafeCell<T>
}
impl<T> SyncUnsafeCell<T> {
pub fn new(x: T) -> SyncUnsafeCell<T> {
SyncUnsafeCell {
data: UnsafeCell::new(x)
}
}
}
impl<T> ::ops::Deref for SyncUnsafeCell<T> {
type Target = UnsafeCell<T>;
fn deref(&self) -> &UnsafeCell<T> {
&self.data
}
}

unsafe impl<T: Send> Send for SyncUnsafeCell<T> {}
unsafe impl<T: Send> Sync for SyncUnsafeCell<T> {}

#[doc(hidden)]
trait UnsafeFlavor<T:Send> {
Expand Down Expand Up @@ -489,7 +509,7 @@ impl<T:Send> UnsafeFlavor<T> for Receiver<T> {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
let a = Arc::new(UnsafeCell::new(oneshot::Packet::new()));
let a = Arc::new(SyncUnsafeCell::new(oneshot::Packet::new()));
(Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
}

Expand Down Expand Up @@ -529,7 +549,7 @@ pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn sync_channel<T: Send>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound)));
let a = Arc::new(SyncUnsafeCell::new(sync::Packet::new(bound)));
(SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
}

Expand Down Expand Up @@ -576,12 +596,12 @@ impl<T: Send> Sender<T> {
let (new_inner, ret) = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => {
unsafe {
let p = p.get();
let p = p.data.get();
if !(*p).sent() {
return (*p).send(t).map_err(SendError);
} else {
let a =
Arc::new(UnsafeCell::new(stream::Packet::new()));
Arc::new(SyncUnsafeCell::new(stream::Packet::new()));
let rx = Receiver::new(Flavor::Stream(a.clone()));
match (*p).upgrade(rx) {
oneshot::UpSuccess => {
Expand Down Expand Up @@ -623,7 +643,7 @@ impl<T: Send> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
let (packet, sleeper, guard) = match *unsafe { self.inner() } {
Flavor::Oneshot(ref p) => {
let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
let a = Arc::new(SyncUnsafeCell::new(shared::Packet::new()));
unsafe {
let guard = (*a.get()).postinit_lock();
let rx = Receiver::new(Flavor::Shared(a.clone()));
Expand All @@ -635,7 +655,7 @@ impl<T: Send> Clone for Sender<T> {
}
}
Flavor::Stream(ref p) => {
let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
let a = Arc::new(SyncUnsafeCell::new(shared::Packet::new()));
unsafe {
let guard = (*a.get()).postinit_lock();
let rx = Receiver::new(Flavor::Shared(a.clone()));
Expand Down Expand Up @@ -681,7 +701,7 @@ impl<T: Send> Drop for Sender<T> {
////////////////////////////////////////////////////////////////////////////////

impl<T: Send> SyncSender<T> {
fn new(inner: Arc<UnsafeCell<sync::Packet<T>>>) -> SyncSender<T> {
fn new(inner: Arc<SyncUnsafeCell<sync::Packet<T>>>) -> SyncSender<T> {
SyncSender { inner: inner }
}

Expand Down
2 changes: 2 additions & 0 deletions src/libstd/sync/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ pub struct MutexGuard<'a, T: 'a> {
__poison: poison::Guard,
}

// It is undefined behaviour to unlock a mutex on a thread other than
// the one that locked it.
impl<'a, T> !marker::Send for MutexGuard<'a, T> {}

/// Static initialization of a mutex. This constant can be used to initialize
Expand Down
4 changes: 2 additions & 2 deletions src/libstd/sync/rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use fmt;
/// } // write lock is dropped here
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub struct RwLock<T> {
pub struct RwLock<T: Send + Sync> {
inner: Box<StaticRwLock>,
data: UnsafeCell<T>,
}
Expand Down Expand Up @@ -251,7 +251,7 @@ impl<T: Send + Sync> RwLock<T> {

#[unsafe_destructor]
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Drop for RwLock<T> {
impl<T: Send + Sync> Drop for RwLock<T> {
fn drop(&mut self) {
unsafe { self.inner.lock.destroy() }
}
Expand Down
20 changes: 12 additions & 8 deletions src/libstd/thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl Builder {
let my_thread = Thread::new(name);
let their_thread = my_thread.clone();

let my_packet = Packet(Arc::new(UnsafeCell::new(None)));
let my_packet = Packet(Arc::new(PacketInner { data: UnsafeCell::new(None) }));
let their_packet = Packet(my_packet.0.clone());

// Spawning a new OS thread guarantees that __morestack will never get
Expand Down Expand Up @@ -331,7 +331,7 @@ impl Builder {
}
};
unsafe {
*their_packet.0.get() = Some(match (output, try_result) {
*their_packet.0.data.get() = Some(match (output, try_result) {
(Some(data), Ok(_)) => Ok(data),
(None, Err(cause)) => Err(cause),
_ => unreachable!()
Expand Down Expand Up @@ -585,26 +585,30 @@ impl thread_info::NewThread for Thread {
#[stable(feature = "rust1", since = "1.0.0")]
pub type Result<T> = ::result::Result<T, Box<Any + Send + 'static>>;

struct Packet<T>(Arc<UnsafeCell<Option<Result<T>>>>);
struct Packet<T: Send>(Arc<PacketInner<T>>);
// see also SyncUnsafeCell in sync/mspc.
struct PacketInner<T: Send> {
data: UnsafeCell<Option<Result<T>>>
}

unsafe impl<T:Send> Send for Packet<T> {}
unsafe impl<T> Sync for Packet<T> {}
unsafe impl<T:Send> Send for PacketInner<T> {}
unsafe impl<T> Sync for PacketInner<T> {}

/// Inner representation for JoinHandle and JoinGuard
struct JoinInner<T> {
struct JoinInner<T: Send> {
native: imp::rust_thread,
thread: Thread,
packet: Packet<T>,
joined: bool,
}

impl<T> JoinInner<T> {
impl<T: Send> JoinInner<T> {
fn join(&mut self) -> Result<T> {
assert!(!self.joined);
unsafe { imp::join(self.native) };
self.joined = true;
unsafe {
(*self.packet.0.get()).take().unwrap()
(*self.packet.0.data.get()).take().unwrap()
}
}
}
Expand Down

0 comments on commit 42094f3

Please sign in to comment.