Skip to content

Commit

Permalink
Remove 'static bounds to allow non-'static data.
Browse files Browse the repository at this point in the history
  • Loading branch information
reem committed May 11, 2015
1 parent 21236cb commit 8a229e1
Show file tree
Hide file tree
Showing 10 changed files with 327 additions and 326 deletions.
66 changes: 33 additions & 33 deletions src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ use self::Lifecycle::*;
*/

// Core implementation of Future & Stream
pub struct Core<T: Send + 'static, E: Send + 'static> {
ptr: *mut CoreInner<T, E>,
pub struct Core<'a, T: Send + 'a, E: Send + 'a> {
ptr: *mut CoreInner<'a, T, E>,
}

impl<T: Send + 'static, E: Send + 'static> Core<T, E> {
pub fn new() -> Core<T, E> {
impl<'a, T: Send + 'a, E: Send + 'a> Core<'a, T, E> {
pub fn new() -> Core<'a, T, E> {
let ptr = Box::new(CoreInner::<T, E>::new());
Core { ptr: unsafe { mem::transmute(ptr) }}
}

pub fn with_value(val: AsyncResult<T, E>) -> Core<T, E> {
pub fn with_value(val: AsyncResult<T, E>) -> Core<'a, T, E> {
let ptr = Box::new(CoreInner::<T, E>::with_value(val));
Core { ptr: unsafe { mem::transmute(ptr) }}
}
Expand Down Expand Up @@ -62,7 +62,7 @@ impl<T: Send + 'static, E: Send + 'static> Core<T, E> {

/// Registers a callback that will be invoked when calling `consumer_poll`
/// will return a value.
pub fn consumer_ready<F: FnOnce(Core<T, E>) + Send + 'static>(&self, f: F) -> Option<u64> {
pub fn consumer_ready<F: FnOnce(Core<T, E>) + Send + 'a>(&self, f: F) -> Option<u64> {
self.inner().consumer_ready(f)
}

Expand All @@ -78,7 +78,7 @@ impl<T: Send + 'static, E: Send + 'static> Core<T, E> {
self.inner().producer_is_err()
}

pub fn producer_poll(&self) -> Option<AsyncResult<Core<T, E>, ()>> {
pub fn producer_poll(&self) -> Option<AsyncResult<Core<'a, T, E>, ()>> {
self.inner().producer_poll()
}

Expand All @@ -94,7 +94,7 @@ impl<T: Send + 'static, E: Send + 'static> Core<T, E> {
}
}

pub fn producer_ready<F: FnOnce(Core<T, E>) + Send + 'static>(&self, f: F) {
pub fn producer_ready<F: FnOnce(Core<T, E>) + Send + 'a>(&self, f: F) {
self.inner().producer_ready(f);
}

Expand All @@ -107,24 +107,24 @@ impl<T: Send + 'static, E: Send + 'static> Core<T, E> {
}

#[inline]
fn inner(&self) -> &CoreInner<T, E> {
fn inner(&self) -> &CoreInner<'a, T, E> {
unsafe { &*self.ptr }
}

#[inline]
fn inner_mut(&mut self) -> &mut CoreInner<T, E> {
fn inner_mut(&mut self) -> &mut CoreInner<'a, T, E> {
unsafe { &mut *self.ptr }
}
}

impl<T: Send + 'static, E: Send + 'static> Clone for Core<T, E> {
fn clone(&self) -> Core<T, E> {
impl<'a, T: Send + 'a, E: Send + 'a> Clone for Core<'a, T, E> {
fn clone(&self) -> Core<'a, T, E> {
// Increments ref count and returns a new core
self.inner().core()
}
}

impl<T: Send + 'static, E: Send + 'static> Drop for Core<T, E> {
impl<'a, T: Send + 'a, E: Send + 'a> Drop for Core<'a, T, E> {
fn drop(&mut self) {
if self.inner().ref_dec(Release) != 1 {
return;
Expand Down Expand Up @@ -153,17 +153,17 @@ impl<T: Send + 'static, E: Send + 'static> Drop for Core<T, E> {
}
}

unsafe impl<T: Send + 'static, E: Send + 'static> Send for Core<T, E> { }
unsafe impl<'a, T: Send + 'a, E: Send + 'a> Send for Core<'a, T, E> { }

pub fn get<T: Send + 'static, E: Send + 'static>(core: &Option<Core<T, E>>) -> &Core<T, E> {
pub fn get<'a, 'b, T: Send + 'a, E: Send + 'a>(core: &'b Option<Core<'a, T, E>>) -> &'b Core<'a, T, E> {
core.as_ref().expect("expected future core")
}

pub fn get_mut<T: Send + 'static, E: Send + 'static>(core: &mut Option<Core<T, E>>) -> &mut Core<T, E> {
pub fn get_mut<'a, 'b, T: Send + 'a, E: Send + 'a>(core: &'b mut Option<Core<'a, T, E>>) -> &'b mut Core<'a, T, E> {
core.as_mut().expect("expected future core")
}

pub fn take<T: Send + 'static, E: Send + 'static>(core: &mut Option<Core<T, E>>) -> Core<T, E> {
pub fn take<'a, T: Send + 'a, E: Send + 'a>(core: &mut Option<Core<'a, T, E>>) -> Core<'a, T, E> {
core.take().expect("expected future core")
}

Expand All @@ -174,16 +174,16 @@ pub fn take<T: Send + 'static, E: Send + 'static>(core: &mut Option<Core<T, E>>)
*/


struct CoreInner<T: Send + 'static, E: Send + 'static> {
struct CoreInner<'a, T: Send + 'a, E: Send + 'a> {
refs: AtomicUsize,
state: AtomicState,
consumer_wait: Option<Callback<T, E>>,
producer_wait: Option<Callback<T, E>>,
consumer_wait: Option<Callback<'a, T, E>>,
producer_wait: Option<Callback<'a, T, E>>,
val: Option<AsyncResult<T, E>>,
}

impl<T: Send + 'static, E: Send + 'static> CoreInner<T, E> {
fn new() -> CoreInner<T, E> {
impl<'a, T: Send + 'a, E: Send + 'a> CoreInner<'a, T, E> {
fn new() -> CoreInner<'a, T, E> {
CoreInner {
refs: AtomicUsize::new(1),
state: AtomicState::new(),
Expand All @@ -193,7 +193,7 @@ impl<T: Send + 'static, E: Send + 'static> CoreInner<T, E> {
}
}

fn with_value(val: AsyncResult<T, E>) -> CoreInner<T, E> {
fn with_value(val: AsyncResult<T, E>) -> CoreInner<'a, T, E> {
CoreInner {
refs: AtomicUsize::new(1),
state: AtomicState::of(Ready),
Expand Down Expand Up @@ -229,7 +229,7 @@ impl<T: Send + 'static, E: Send + 'static> CoreInner<T, E> {
Some(self.consume_val(curr))
}

fn consumer_ready<F: FnOnce(Core<T, E>) + Send + 'static>(&self, f: F) -> Option<u64> {
fn consumer_ready<F: FnOnce(Core<T, E>) + Send + 'a>(&self, f: F) -> Option<u64> {
let mut curr = self.state.load(Relaxed);

debug!("Core::consumer_ready; state={:?}", curr);
Expand Down Expand Up @@ -414,7 +414,7 @@ impl<T: Send + 'static, E: Send + 'static> CoreInner<T, E> {
unimplemented!();
}

pub fn producer_poll(&self) -> Option<AsyncResult<Core<T, E>, ()>> {
pub fn producer_poll(&self) -> Option<AsyncResult<Core<'a, T, E>, ()>> {
let curr = self.state.load(Relaxed);

debug!("Core::producer_poll; state={:?}", curr);
Expand All @@ -430,7 +430,7 @@ impl<T: Send + 'static, E: Send + 'static> CoreInner<T, E> {
Some(Ok(self.core()))
}

fn producer_ready<F: FnOnce(Core<T, E>) + Send + 'static >(&self, f: F) {
fn producer_ready<F: FnOnce(Core<T, E>) + Send + 'a >(&self, f: F) {
let mut curr = self.state.load(Relaxed);

debug!("Core::producer_ready; state={:?}", curr);
Expand Down Expand Up @@ -730,28 +730,28 @@ impl<T: Send + 'static, E: Send + 'static> CoreInner<T, E> {
self.val.take().expect("expected a value")
}

fn put_consumer_wait(&self, cb: Callback<T, E>) {
fn put_consumer_wait(&self, cb: Callback<'a, T, E>) {
unsafe {
let s: &mut CoreInner<T, E> = mem::transmute(self);
s.consumer_wait = Some(cb);
}
}

fn take_consumer_wait(&self) -> Callback<T, E> {
fn take_consumer_wait(&self) -> Callback<'a, T, E> {
unsafe {
let s: &mut CoreInner<T, E> = mem::transmute(self);
s.consumer_wait.take().expect("consumer_wait is none")
}
}

fn put_producer_wait(&self, cb: Callback<T, E>) {
fn put_producer_wait(&self, cb: Callback<'a, T, E>) {
unsafe {
let s: &mut CoreInner<T, E> = mem::transmute(self);
s.producer_wait = Some(cb);
}
}

fn take_producer_wait(&self) -> Callback<T, E> {
fn take_producer_wait(&self) -> Callback<'a, T, E> {
unsafe {
let s: &mut CoreInner<T, E> = mem::transmute(self);
s.producer_wait.take().expect("producer_wait is none")
Expand All @@ -766,7 +766,7 @@ impl<T: Send + 'static, E: Send + 'static> CoreInner<T, E> {
self.refs.fetch_sub(1, order)
}

fn core(&self) -> Core<T, E> {
fn core(&self) -> Core<'a, T, E> {
// Using a relaxed ordering is alright here, as knowledge of the original reference
// prevents other threads from erroneously deleting the object.
//
Expand All @@ -781,7 +781,7 @@ impl<T: Send + 'static, E: Send + 'static> CoreInner<T, E> {
}
}

unsafe impl<T: Send + 'static, E: Send + 'static> Send for CoreInner<T, E> { }
unsafe impl<'a, T: Send + 'a, E: Send + 'a> Send for CoreInner<'a, T, E> { }

struct AtomicState {
atomic: AtomicU64,
Expand Down Expand Up @@ -945,7 +945,7 @@ impl fmt::Debug for State {
}
}

type Callback<T, E> = Box<BoxedReceive<Core<T, E>>>;
type Callback<'a, T, E> = Box<BoxedReceive<Core<'a, T, E>> + 'a>;

#[derive(Debug, PartialEq, Eq)]
enum Lifecycle {
Expand Down
Loading

0 comments on commit 8a229e1

Please sign in to comment.