Skip to content

Commit

Permalink
refactor(Scheduler): change order of arguments to schedule method
Browse files Browse the repository at this point in the history
schedule is now: schedule(work, delay?, state?)
also cleaned up some tests that still used scheduling from the early days when subscription was still async

closes #265
  • Loading branch information
benlesh committed Sep 4, 2015
1 parent 9e62789 commit e077230
Show file tree
Hide file tree
Showing 25 changed files with 69 additions and 99 deletions.
8 changes: 4 additions & 4 deletions spec/scheduler-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ describe('Scheduler.immediate', function() {
var call1 = false;
var call2 = false;
Scheduler.immediate.active = false;
Scheduler.immediate.schedule(0, null, function(){
Scheduler.immediate.schedule(function () {
call1 = true;
Scheduler.immediate.schedule(0, null, function(){
Scheduler.immediate.schedule(function(){
call2 = true;
});
});
Expand All @@ -20,9 +20,9 @@ describe('Scheduler.immediate', function() {

it('should schedule things in the future too', function (done) {
var called = false;
Scheduler.immediate.schedule(500, null, function () {
Scheduler.immediate.schedule(function () {
called = true;
});
}, 500);

setTimeout(function () {
expect(called).toBe(false);
Expand Down
44 changes: 13 additions & 31 deletions spec/subject-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,10 @@ describe('Subject', function () {
it('should pump values right on through itself', function (done) {
var subject = new Subject();
var expected = ['foo', 'bar'];
var i = 0;

subject.subscribe(function (x) {
expect(x).toBe(expected[i++]);
}, null,
function () {
done();
});
expect(x).toBe(expected.shift());
}, null, done);

subject.next('foo');
subject.next('bar');
Expand All @@ -34,39 +30,25 @@ describe('Subject', function () {

subject.subscribe(function (x) {
expect(x).toBe(expected[j++]);
}, null,
function () {
done();
});
}, null, done);

// HACK
nextTick.schedule(0, null, function () {
expect(subject.observers.length).toBe(2);
subject.next('foo');
subject.next('bar');
subject.complete();
});
expect(subject.observers.length).toBe(2);
subject.next('foo');
subject.next('bar');
subject.complete();
});

it('should not allow values to be nexted after a return', function (done) {
var subject = new Subject();
var expected = ['foo'];
var i = 0;

subject.subscribe(function (x) {
expect(x).toBe(expected[i++]);
}, null,
function () {
//HACK
nextTick.schedule(0, null, done);
});

// HACK
nextTick.schedule(0, null, function () {
subject.next('foo');
subject.complete();
subject.next('bar');
});
expect(x).toBe(expected.shift());
}, null, done);

subject.next('foo');
subject.complete();
subject.next('bar');
});

it('should clean out unsubscribed subscribers', function (done) {
Expand Down
14 changes: 4 additions & 10 deletions spec/subjects/behavior-subject-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,9 @@ describe('BehaviorSubject', function() {

subject.subscribe(function(x) {
expect(x).toBe(expected[i++]);
}, null,
function(){
done();
});

// HACK
Rx.Scheduler.nextTick.schedule(0, null, function(){
subject.next('bar');
subject.complete();
});
}, null, done);

subject.next('bar');
subject.complete();
});
});
2 changes: 1 addition & 1 deletion src/Scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import Subscription from './Subscription';
interface Scheduler {
now(): number;

schedule<T>(delay: number, state: any, work: (state?: any) => Subscription<T>|void): Subscription<T>;
schedule<T>(work: (state?: any) => Subscription<T>|void, delay?: number, state?: any): Subscription<T>;
}

export default Scheduler;
4 changes: 2 additions & 2 deletions src/observables/ArrayObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ export default class ArrayObservable<T> extends Observable<T> {
const scheduler = this.scheduler;

if (scheduler) {
subscriber.add(scheduler.schedule(0, {
subscriber.add(scheduler.schedule(ArrayObservable.dispatch, 0, {
array, index, count, subscriber
}, ArrayObservable.dispatch));
}));
} else {
do {
if (index >= count) {
Expand Down
2 changes: 1 addition & 1 deletion src/observables/EmptyObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export default class EmptyObservable<T> extends Observable<T> {
const scheduler = this.scheduler;

if (scheduler) {
subscriber.add(scheduler.schedule(0, { subscriber }, EmptyObservable.dispatch));
subscriber.add(scheduler.schedule(EmptyObservable.dispatch, 0, { subscriber }));
} else {
subscriber.complete();
}
Expand Down
4 changes: 2 additions & 2 deletions src/observables/ErrorObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ export default class ErrorObservable<T> extends Observable<T> {
const scheduler = this.scheduler;

if (scheduler) {
subscriber.add(scheduler.schedule(0, {
subscriber.add(scheduler.schedule(ErrorObservable.dispatch, 0, {
error, subscriber
}, ErrorObservable.dispatch));
}));
} else {
subscriber.error(error);
}
Expand Down
4 changes: 2 additions & 2 deletions src/observables/IntervalObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ export default class IntervalObservable<T> extends Observable<T> {
const period = this.period;
const scheduler = this.scheduler;

subscriber.add(scheduler.schedule(period, {
subscriber.add(scheduler.schedule(IntervalObservable.dispatch, period, {
index, subscriber
}, IntervalObservable.dispatch));
}));
}
}
4 changes: 2 additions & 2 deletions src/observables/IteratorObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ export default class IteratorObservable<T> extends Observable<T> {
const scheduler = this.scheduler;

if (scheduler) {
subscriber.add(scheduler.schedule(0, {
subscriber.add(scheduler.schedule(IteratorObservable.dispatch, 0, {
index, thisArg, project, iterator, subscriber
}, IteratorObservable.dispatch));
}));
} else {
do {
let result = iterator.next();
Expand Down
4 changes: 2 additions & 2 deletions src/observables/PromiseObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ export default class PromiseObservable<T> extends Observable<T> {
err => subscriber.error(err));
} else {
let subscription = new Subscription();
promise.then(value => subscription.add(scheduler.schedule(0, { value, subscriber }, dispatchNext)),
err => subscription.add(scheduler.schedule(0, { err, subscriber }, dispatchError)));
promise.then(value => subscription.add(scheduler.schedule(dispatchNext, 0, { value, subscriber })),
err => subscription.add(scheduler.schedule(dispatchError, 0, { err, subscriber })));
return subscription;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/observables/RangeObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ export default class RangeObservable<T> extends Observable<T> {
const scheduler = this.scheduler;

if (scheduler) {
subscriber.add(scheduler.schedule(0, {
subscriber.add(scheduler.schedule(RangeObservable.dispatch, 0, {
index, end, start, subscriber
}, RangeObservable.dispatch));
}));
} else {
do {
if (index++ >= end) {
Expand Down
4 changes: 2 additions & 2 deletions src/observables/ScalarObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ export default class ScalarObservable<T> extends Observable<T> {
const scheduler = this.scheduler;

if (scheduler) {
subscriber.add(scheduler.schedule(0, {
subscriber.add(scheduler.schedule(ScalarObservable.dispatch, 0, {
done: false, value, subscriber
}, ScalarObservable.dispatch));
}));
} else {
subscriber.next(value);
if (!subscriber.isUnsubscribed) {
Expand Down
4 changes: 2 additions & 2 deletions src/observables/SubscribeOnObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ export default class SubscribeOnObservable<T> extends Observable<T> {
const source = this.source;
const scheduler = this.scheduler;

subscriber.add(scheduler.schedule(delay, {
subscriber.add(scheduler.schedule(SubscribeOnObservable.dispatch, delay, {
source, subscriber
}, SubscribeOnObservable.dispatch));
}));
}
}
8 changes: 3 additions & 5 deletions src/observables/TimerObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ export default class TimerObservable<T> extends Observable<T> {
}

if (typeof action.delay === 'undefined') {
action.add(action.scheduler.schedule(period, {
action.add(action.scheduler.schedule(TimerObservable.dispatch, period, {
index: index + 1, period, subscriber
}, TimerObservable.dispatch));
}));
} else {
state.index = index + 1;
action.schedule(state);
Expand Down Expand Up @@ -55,8 +55,6 @@ export default class TimerObservable<T> extends Observable<T> {
const dueTime = this.dueTime;
const scheduler = this.scheduler;

subscriber.add(scheduler.schedule(dueTime, {
index, period, subscriber
}, TimerObservable.dispatch));
subscriber.add(scheduler.schedule(TimerObservable.dispatch, dueTime, { index, period, subscriber }));
}
}
8 changes: 4 additions & 4 deletions src/operators/bufferTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ class BufferTimeSubscriber<T> extends Subscriber<T> {
super(destination);
let buffer = this.openBuffer();
if (bufferCreationInterval !== null && bufferCreationInterval >= 0) {
this.add(scheduler.schedule(bufferTimeSpan, { subscriber: this, buffer }, dispatchBufferClose));
this.add(scheduler.schedule(bufferCreationInterval, { bufferTimeSpan, bufferCreationInterval, subscriber: this, scheduler }, dispatchBufferCreation));
this.add(scheduler.schedule(dispatchBufferClose, bufferTimeSpan, { subscriber: this, buffer }));
this.add(scheduler.schedule(dispatchBufferCreation, bufferCreationInterval, { bufferTimeSpan, bufferCreationInterval, subscriber: this, scheduler }));
} else {
this.add(scheduler.schedule(bufferTimeSpan, { subscriber: this, buffer }, dispatchBufferTimeSpanOnly));
this.add(scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, { subscriber: this, buffer }));
}
}

Expand Down Expand Up @@ -89,7 +89,7 @@ function dispatchBufferCreation(state) {
let { bufferTimeSpan, subscriber, scheduler } = state;
let buffer = subscriber.openBuffer();
var action = <Action<any>>this;
action.add(scheduler.schedule(bufferTimeSpan, { subscriber, buffer }, dispatchBufferClose));
action.add(scheduler.schedule(dispatchBufferClose, bufferTimeSpan, { subscriber, buffer }));
action.schedule(state);
}

Expand Down
2 changes: 1 addition & 1 deletion src/operators/debounce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class DebounceSubscriber<T> extends Subscriber<T> {

_next(value: T) {
if (!this.debounced) {
this.add(this.debounced = this.scheduler.schedule(this.dueTime, { value, subscriber: this }, dispatchNext));
this.add(this.debounced = this.scheduler.schedule(dispatchNext, this.dueTime, { value, subscriber: this }));
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/operators/delay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ class DelaySubscriber<T> extends Subscriber<T> {

_schedule(scheduler) {
this.active = true;
this.add(scheduler.schedule(this.delay, {
this.add(scheduler.schedule(DelaySubscriber.dispatch, this.delay, {
source: this, destination: this.destination, scheduler: scheduler
}, DelaySubscriber.dispatch));
}));
}
}

Expand Down
16 changes: 6 additions & 10 deletions src/operators/observeOn-support.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,18 @@ export class ObserveOnSubscriber<T> extends Subscriber<T> {
}

_next(x) {
this.add(this.scheduler.schedule(this.delay,
new ObserveOnMessage(Notification.createNext(x), this.destination),
ObserveOnSubscriber.dispatch)
);
this.add(this.scheduler.schedule(ObserveOnSubscriber.dispatch, this.delay,
new ObserveOnMessage(Notification.createNext(x), this.destination)));
}

_error(e) {
this.add(this.scheduler.schedule(this.delay,
new ObserveOnMessage(Notification.createError(e), this.destination),
ObserveOnSubscriber.dispatch));
this.add(this.scheduler.schedule(ObserveOnSubscriber.dispatch, this.delay,
new ObserveOnMessage(Notification.createError(e), this.destination)));
}

_complete() {
this.add(this.scheduler.schedule(this.delay,
new ObserveOnMessage(Notification.createComplete(), this.destination),
ObserveOnSubscriber.dispatch));
this.add(this.scheduler.schedule(ObserveOnSubscriber.dispatch, this.delay,
new ObserveOnMessage(Notification.createComplete(), this.destination)));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operators/sampleTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class SampleTimeSubscriber<T> extends Subscriber<T> {

constructor(destination: Observer<T>, private delay: number, private scheduler: Scheduler) {
super(destination);
this.add(scheduler.schedule(delay, { subscriber: this }, dispatchNotification));
this.add(scheduler.schedule(dispatchNotification, delay, { subscriber: this }));
}

_next(value: T) {
Expand Down
2 changes: 1 addition & 1 deletion src/operators/throttle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ThrottleSubscriber<T, R> extends Subscriber<T> {

_next(x) {
this.clearThrottle();
this.add(this.throttled = this.scheduler.schedule(this.delay, { value: x, subscriber: this }, dispatchNext));
this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.delay, { value: x, subscriber: this }));
}

throttledNext(x) {
Expand Down
2 changes: 1 addition & 1 deletion src/operators/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class TimeoutSubscriber<T> extends Subscriber<T> {
constructor(destination: Observer<T>, private waitFor: number, private errorToSend: any, private scheduler: Scheduler) {
super(destination);
let delay = waitFor;
scheduler.schedule(delay, { subscriber: this }, dispatchTimeout);
scheduler.schedule(dispatchTimeout, delay, { subscriber: this });
}

sendTimeoutError() {
Expand Down
2 changes: 1 addition & 1 deletion src/operators/timeoutWith.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class TimeoutWithSubscriber<T> extends Subscriber<T> {
constructor(destination: Observer<T>, private waitFor: number, private withObservable: Observable<any>, private scheduler: Scheduler) {
super(destination);
let delay = waitFor;
scheduler.schedule(delay, { subscriber: this }, dispatchTimeout);
scheduler.schedule(dispatchTimeout, delay, { subscriber: this });
}

handleTimeout() {
Expand Down
8 changes: 4 additions & 4 deletions src/operators/windowTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ class WindowTimeSubscriber<T> extends Subscriber<T> {
super(destination);
if (windowCreationInterval !== null && windowCreationInterval >= 0) {
let window = this.openWindow();
this.add(scheduler.schedule(windowTimeSpan, { subscriber: this, window, context: null }, dispatchWindowClose))
this.add(scheduler.schedule(windowCreationInterval, { windowTimeSpan, windowCreationInterval, subscriber: this, scheduler }, dispatchWindowCreation))
this.add(scheduler.schedule(dispatchWindowClose, windowTimeSpan, { subscriber: this, window, context: null }))
this.add(scheduler.schedule(dispatchWindowCreation, windowCreationInterval, { windowTimeSpan, windowCreationInterval, subscriber: this, scheduler }))
} else {
let window = this.openWindow();
this.add(scheduler.schedule(windowTimeSpan, { subscriber: this, window }, dispatchWindowTimeSpanOnly));
this.add(scheduler.schedule(dispatchWindowTimeSpanOnly, windowTimeSpan, { subscriber: this, window }));
}
}

Expand Down Expand Up @@ -96,7 +96,7 @@ function dispatchWindowCreation(state) {
let window = subscriber.openWindow();
let action = <Action<any>>this;
let context = { action, subscription: null };
action.add(context.subscription = scheduler.schedule(windowTimeSpan, { subscriber, window, context }, dispatchWindowClose));
action.add(context.subscription = scheduler.schedule(dispatchWindowClose, windowTimeSpan, { subscriber, window, context }));
action.schedule(state);
}

Expand Down
Loading

0 comments on commit e077230

Please sign in to comment.