From e077230d26be9e9b89169c0c05379c34e7042ff9 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Fri, 4 Sep 2015 11:57:26 -0700 Subject: [PATCH] refactor(Scheduler): change order of arguments to schedule method schedule is now: schedule(work, delay?, state?) also cleaned up some tests that still used scheduling from the early days when subscription was still async closes #265 --- spec/scheduler-spec.js | 8 ++--- spec/subject-spec.js | 44 +++++++----------------- spec/subjects/behavior-subject-spec.js | 14 +++----- src/Scheduler.ts | 2 +- src/observables/ArrayObservable.ts | 4 +-- src/observables/EmptyObservable.ts | 2 +- src/observables/ErrorObservable.ts | 4 +-- src/observables/IntervalObservable.ts | 4 +-- src/observables/IteratorObservable.ts | 4 +-- src/observables/PromiseObservable.ts | 4 +-- src/observables/RangeObservable.ts | 4 +-- src/observables/ScalarObservable.ts | 4 +-- src/observables/SubscribeOnObservable.ts | 4 +-- src/observables/TimerObservable.ts | 8 ++--- src/operators/bufferTime.ts | 8 ++--- src/operators/debounce.ts | 2 +- src/operators/delay.ts | 4 +-- src/operators/observeOn-support.ts | 16 ++++----- src/operators/sampleTime.ts | 2 +- src/operators/throttle.ts | 2 +- src/operators/timeout.ts | 2 +- src/operators/timeoutWith.ts | 2 +- src/operators/windowTime.ts | 8 ++--- src/schedulers/ImmediateScheduler.ts | 10 +++--- src/schedulers/NextTickScheduler.ts | 2 +- 25 files changed, 69 insertions(+), 99 deletions(-) diff --git a/spec/scheduler-spec.js b/spec/scheduler-spec.js index fcb073e40e..576200bb26 100644 --- a/spec/scheduler-spec.js +++ b/spec/scheduler-spec.js @@ -8,9 +8,9 @@ describe('Scheduler.immediate', function() { var call1 = false; var call2 = false; Scheduler.immediate.active = false; - Scheduler.immediate.schedule(0, null, function(){ + Scheduler.immediate.schedule(function () { call1 = true; - Scheduler.immediate.schedule(0, null, function(){ + Scheduler.immediate.schedule(function(){ call2 = true; }); }); @@ -20,9 +20,9 @@ describe('Scheduler.immediate', function() { it('should schedule things in the future too', function (done) { var called = false; - Scheduler.immediate.schedule(500, null, function () { + Scheduler.immediate.schedule(function () { called = true; - }); + }, 500); setTimeout(function () { expect(called).toBe(false); diff --git a/spec/subject-spec.js b/spec/subject-spec.js index 4e7072fb4e..47dedfc3ca 100644 --- a/spec/subject-spec.js +++ b/spec/subject-spec.js @@ -8,14 +8,10 @@ describe('Subject', function () { it('should pump values right on through itself', function (done) { var subject = new Subject(); var expected = ['foo', 'bar']; - var i = 0; subject.subscribe(function (x) { - expect(x).toBe(expected[i++]); - }, null, - function () { - done(); - }); + expect(x).toBe(expected.shift()); + }, null, done); subject.next('foo'); subject.next('bar'); @@ -34,39 +30,25 @@ describe('Subject', function () { subject.subscribe(function (x) { expect(x).toBe(expected[j++]); - }, null, - function () { - done(); - }); + }, null, done); - // HACK - nextTick.schedule(0, null, function () { - expect(subject.observers.length).toBe(2); - subject.next('foo'); - subject.next('bar'); - subject.complete(); - }); + expect(subject.observers.length).toBe(2); + subject.next('foo'); + subject.next('bar'); + subject.complete(); }); it('should not allow values to be nexted after a return', function (done) { var subject = new Subject(); var expected = ['foo']; - var i = 0; subject.subscribe(function (x) { - expect(x).toBe(expected[i++]); - }, null, - function () { - //HACK - nextTick.schedule(0, null, done); - }); - - // HACK - nextTick.schedule(0, null, function () { - subject.next('foo'); - subject.complete(); - subject.next('bar'); - }); + expect(x).toBe(expected.shift()); + }, null, done); + + subject.next('foo'); + subject.complete(); + subject.next('bar'); }); it('should clean out unsubscribed subscribers', function (done) { diff --git a/spec/subjects/behavior-subject-spec.js b/spec/subjects/behavior-subject-spec.js index 9e44eeeda6..690347fc06 100644 --- a/spec/subjects/behavior-subject-spec.js +++ b/spec/subjects/behavior-subject-spec.js @@ -18,15 +18,9 @@ describe('BehaviorSubject', function() { subject.subscribe(function(x) { expect(x).toBe(expected[i++]); - }, null, - function(){ - done(); - }); - - // HACK - Rx.Scheduler.nextTick.schedule(0, null, function(){ - subject.next('bar'); - subject.complete(); - }); + }, null, done); + + subject.next('bar'); + subject.complete(); }); }); \ No newline at end of file diff --git a/src/Scheduler.ts b/src/Scheduler.ts index b13a81a842..d0a9f3dbe2 100644 --- a/src/Scheduler.ts +++ b/src/Scheduler.ts @@ -3,7 +3,7 @@ import Subscription from './Subscription'; interface Scheduler { now(): number; - schedule(delay: number, state: any, work: (state?: any) => Subscription|void): Subscription; + schedule(work: (state?: any) => Subscription|void, delay?: number, state?: any): Subscription; } export default Scheduler; \ No newline at end of file diff --git a/src/observables/ArrayObservable.ts b/src/observables/ArrayObservable.ts index 752023d66d..834bb17565 100644 --- a/src/observables/ArrayObservable.ts +++ b/src/observables/ArrayObservable.ts @@ -59,9 +59,9 @@ export default class ArrayObservable extends Observable { const scheduler = this.scheduler; if (scheduler) { - subscriber.add(scheduler.schedule(0, { + subscriber.add(scheduler.schedule(ArrayObservable.dispatch, 0, { array, index, count, subscriber - }, ArrayObservable.dispatch)); + })); } else { do { if (index >= count) { diff --git a/src/observables/EmptyObservable.ts b/src/observables/EmptyObservable.ts index 0e3da5efa7..cf1bed6113 100644 --- a/src/observables/EmptyObservable.ts +++ b/src/observables/EmptyObservable.ts @@ -20,7 +20,7 @@ export default class EmptyObservable extends Observable { const scheduler = this.scheduler; if (scheduler) { - subscriber.add(scheduler.schedule(0, { subscriber }, EmptyObservable.dispatch)); + subscriber.add(scheduler.schedule(EmptyObservable.dispatch, 0, { subscriber })); } else { subscriber.complete(); } diff --git a/src/observables/ErrorObservable.ts b/src/observables/ErrorObservable.ts index 055d91752b..e195df483e 100644 --- a/src/observables/ErrorObservable.ts +++ b/src/observables/ErrorObservable.ts @@ -21,9 +21,9 @@ export default class ErrorObservable extends Observable { const scheduler = this.scheduler; if (scheduler) { - subscriber.add(scheduler.schedule(0, { + subscriber.add(scheduler.schedule(ErrorObservable.dispatch, 0, { error, subscriber - }, ErrorObservable.dispatch)); + })); } else { subscriber.error(error); } diff --git a/src/observables/IntervalObservable.ts b/src/observables/IntervalObservable.ts index 85da74365b..d1001295c4 100644 --- a/src/observables/IntervalObservable.ts +++ b/src/observables/IntervalObservable.ts @@ -40,8 +40,8 @@ export default class IntervalObservable extends Observable { const period = this.period; const scheduler = this.scheduler; - subscriber.add(scheduler.schedule(period, { + subscriber.add(scheduler.schedule(IntervalObservable.dispatch, period, { index, subscriber - }, IntervalObservable.dispatch)); + })); } } diff --git a/src/observables/IteratorObservable.ts b/src/observables/IteratorObservable.ts index 3a707db2f9..7101ecb262 100644 --- a/src/observables/IteratorObservable.ts +++ b/src/observables/IteratorObservable.ts @@ -74,9 +74,9 @@ export default class IteratorObservable extends Observable { const scheduler = this.scheduler; if (scheduler) { - subscriber.add(scheduler.schedule(0, { + subscriber.add(scheduler.schedule(IteratorObservable.dispatch, 0, { index, thisArg, project, iterator, subscriber - }, IteratorObservable.dispatch)); + })); } else { do { let result = iterator.next(); diff --git a/src/observables/PromiseObservable.ts b/src/observables/PromiseObservable.ts index 2bd4fc66de..501945ae01 100644 --- a/src/observables/PromiseObservable.ts +++ b/src/observables/PromiseObservable.ts @@ -26,8 +26,8 @@ export default class PromiseObservable extends Observable { err => subscriber.error(err)); } else { let subscription = new Subscription(); - promise.then(value => subscription.add(scheduler.schedule(0, { value, subscriber }, dispatchNext)), - err => subscription.add(scheduler.schedule(0, { err, subscriber }, dispatchError))); + promise.then(value => subscription.add(scheduler.schedule(dispatchNext, 0, { value, subscriber })), + err => subscription.add(scheduler.schedule(dispatchError, 0, { err, subscriber }))); return subscription; } } diff --git a/src/observables/RangeObservable.ts b/src/observables/RangeObservable.ts index 07fd593470..7f8e6b0fad 100644 --- a/src/observables/RangeObservable.ts +++ b/src/observables/RangeObservable.ts @@ -47,9 +47,9 @@ export default class RangeObservable extends Observable { const scheduler = this.scheduler; if (scheduler) { - subscriber.add(scheduler.schedule(0, { + subscriber.add(scheduler.schedule(RangeObservable.dispatch, 0, { index, end, start, subscriber - }, RangeObservable.dispatch)); + })); } else { do { if (index++ >= end) { diff --git a/src/observables/ScalarObservable.ts b/src/observables/ScalarObservable.ts index 896347b5f6..ad987757a6 100644 --- a/src/observables/ScalarObservable.ts +++ b/src/observables/ScalarObservable.ts @@ -37,9 +37,9 @@ export default class ScalarObservable extends Observable { const scheduler = this.scheduler; if (scheduler) { - subscriber.add(scheduler.schedule(0, { + subscriber.add(scheduler.schedule(ScalarObservable.dispatch, 0, { done: false, value, subscriber - }, ScalarObservable.dispatch)); + })); } else { subscriber.next(value); if (!subscriber.isUnsubscribed) { diff --git a/src/observables/SubscribeOnObservable.ts b/src/observables/SubscribeOnObservable.ts index 62948254c7..a0b6bbe3bf 100644 --- a/src/observables/SubscribeOnObservable.ts +++ b/src/observables/SubscribeOnObservable.ts @@ -28,8 +28,8 @@ export default class SubscribeOnObservable extends Observable { const source = this.source; const scheduler = this.scheduler; - subscriber.add(scheduler.schedule(delay, { + subscriber.add(scheduler.schedule(SubscribeOnObservable.dispatch, delay, { source, subscriber - }, SubscribeOnObservable.dispatch)); + })); } } diff --git a/src/observables/TimerObservable.ts b/src/observables/TimerObservable.ts index bc96e6f3b5..9802594bfe 100644 --- a/src/observables/TimerObservable.ts +++ b/src/observables/TimerObservable.ts @@ -24,9 +24,9 @@ export default class TimerObservable extends Observable { } if (typeof action.delay === 'undefined') { - action.add(action.scheduler.schedule(period, { + action.add(action.scheduler.schedule(TimerObservable.dispatch, period, { index: index + 1, period, subscriber - }, TimerObservable.dispatch)); + })); } else { state.index = index + 1; action.schedule(state); @@ -55,8 +55,6 @@ export default class TimerObservable extends Observable { const dueTime = this.dueTime; const scheduler = this.scheduler; - subscriber.add(scheduler.schedule(dueTime, { - index, period, subscriber - }, TimerObservable.dispatch)); + subscriber.add(scheduler.schedule(TimerObservable.dispatch, dueTime, { index, period, subscriber })); } } diff --git a/src/operators/bufferTime.ts b/src/operators/bufferTime.ts index 07596edea0..a8c3a199d6 100644 --- a/src/operators/bufferTime.ts +++ b/src/operators/bufferTime.ts @@ -32,10 +32,10 @@ class BufferTimeSubscriber extends Subscriber { super(destination); let buffer = this.openBuffer(); if (bufferCreationInterval !== null && bufferCreationInterval >= 0) { - this.add(scheduler.schedule(bufferTimeSpan, { subscriber: this, buffer }, dispatchBufferClose)); - this.add(scheduler.schedule(bufferCreationInterval, { bufferTimeSpan, bufferCreationInterval, subscriber: this, scheduler }, dispatchBufferCreation)); + this.add(scheduler.schedule(dispatchBufferClose, bufferTimeSpan, { subscriber: this, buffer })); + this.add(scheduler.schedule(dispatchBufferCreation, bufferCreationInterval, { bufferTimeSpan, bufferCreationInterval, subscriber: this, scheduler })); } else { - this.add(scheduler.schedule(bufferTimeSpan, { subscriber: this, buffer }, dispatchBufferTimeSpanOnly)); + this.add(scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, { subscriber: this, buffer })); } } @@ -89,7 +89,7 @@ function dispatchBufferCreation(state) { let { bufferTimeSpan, subscriber, scheduler } = state; let buffer = subscriber.openBuffer(); var action = >this; - action.add(scheduler.schedule(bufferTimeSpan, { subscriber, buffer }, dispatchBufferClose)); + action.add(scheduler.schedule(dispatchBufferClose, bufferTimeSpan, { subscriber, buffer })); action.schedule(state); } diff --git a/src/operators/debounce.ts b/src/operators/debounce.ts index 1a452562b4..797e9b3013 100644 --- a/src/operators/debounce.ts +++ b/src/operators/debounce.ts @@ -33,7 +33,7 @@ class DebounceSubscriber extends Subscriber { _next(value: T) { if (!this.debounced) { - this.add(this.debounced = this.scheduler.schedule(this.dueTime, { value, subscriber: this }, dispatchNext)); + this.add(this.debounced = this.scheduler.schedule(dispatchNext, this.dueTime, { value, subscriber: this })); } } diff --git a/src/operators/delay.ts b/src/operators/delay.ts index ad0a4d9a4e..1aee04c9a7 100644 --- a/src/operators/delay.ts +++ b/src/operators/delay.ts @@ -87,9 +87,9 @@ class DelaySubscriber extends Subscriber { _schedule(scheduler) { this.active = true; - this.add(scheduler.schedule(this.delay, { + this.add(scheduler.schedule(DelaySubscriber.dispatch, this.delay, { source: this, destination: this.destination, scheduler: scheduler - }, DelaySubscriber.dispatch)); + })); } } diff --git a/src/operators/observeOn-support.ts b/src/operators/observeOn-support.ts index 7d200f741c..ef7332d310 100644 --- a/src/operators/observeOn-support.ts +++ b/src/operators/observeOn-support.ts @@ -35,22 +35,18 @@ export class ObserveOnSubscriber extends Subscriber { } _next(x) { - this.add(this.scheduler.schedule(this.delay, - new ObserveOnMessage(Notification.createNext(x), this.destination), - ObserveOnSubscriber.dispatch) - ); + this.add(this.scheduler.schedule(ObserveOnSubscriber.dispatch, this.delay, + new ObserveOnMessage(Notification.createNext(x), this.destination))); } _error(e) { - this.add(this.scheduler.schedule(this.delay, - new ObserveOnMessage(Notification.createError(e), this.destination), - ObserveOnSubscriber.dispatch)); + this.add(this.scheduler.schedule(ObserveOnSubscriber.dispatch, this.delay, + new ObserveOnMessage(Notification.createError(e), this.destination))); } _complete() { - this.add(this.scheduler.schedule(this.delay, - new ObserveOnMessage(Notification.createComplete(), this.destination), - ObserveOnSubscriber.dispatch)); + this.add(this.scheduler.schedule(ObserveOnSubscriber.dispatch, this.delay, + new ObserveOnMessage(Notification.createComplete(), this.destination))); } } diff --git a/src/operators/sampleTime.ts b/src/operators/sampleTime.ts index 18c1b4645d..7c23f18f09 100644 --- a/src/operators/sampleTime.ts +++ b/src/operators/sampleTime.ts @@ -25,7 +25,7 @@ class SampleTimeSubscriber extends Subscriber { constructor(destination: Observer, private delay: number, private scheduler: Scheduler) { super(destination); - this.add(scheduler.schedule(delay, { subscriber: this }, dispatchNotification)); + this.add(scheduler.schedule(dispatchNotification, delay, { subscriber: this })); } _next(value: T) { diff --git a/src/operators/throttle.ts b/src/operators/throttle.ts index 93f8332e71..a02e08d2fa 100644 --- a/src/operators/throttle.ts +++ b/src/operators/throttle.ts @@ -32,7 +32,7 @@ class ThrottleSubscriber extends Subscriber { _next(x) { this.clearThrottle(); - this.add(this.throttled = this.scheduler.schedule(this.delay, { value: x, subscriber: this }, dispatchNext)); + this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.delay, { value: x, subscriber: this })); } throttledNext(x) { diff --git a/src/operators/timeout.ts b/src/operators/timeout.ts index 9e08bd6861..76f653a234 100644 --- a/src/operators/timeout.ts +++ b/src/operators/timeout.ts @@ -26,7 +26,7 @@ class TimeoutSubscriber extends Subscriber { constructor(destination: Observer, private waitFor: number, private errorToSend: any, private scheduler: Scheduler) { super(destination); let delay = waitFor; - scheduler.schedule(delay, { subscriber: this }, dispatchTimeout); + scheduler.schedule(dispatchTimeout, delay, { subscriber: this }); } sendTimeoutError() { diff --git a/src/operators/timeoutWith.ts b/src/operators/timeoutWith.ts index 2d7af95103..a991e9a048 100644 --- a/src/operators/timeoutWith.ts +++ b/src/operators/timeoutWith.ts @@ -27,7 +27,7 @@ class TimeoutWithSubscriber extends Subscriber { constructor(destination: Observer, private waitFor: number, private withObservable: Observable, private scheduler: Scheduler) { super(destination); let delay = waitFor; - scheduler.schedule(delay, { subscriber: this }, dispatchTimeout); + scheduler.schedule(dispatchTimeout, delay, { subscriber: this }); } handleTimeout() { diff --git a/src/operators/windowTime.ts b/src/operators/windowTime.ts index 3bff0c79de..adf71492e8 100644 --- a/src/operators/windowTime.ts +++ b/src/operators/windowTime.ts @@ -33,11 +33,11 @@ class WindowTimeSubscriber extends Subscriber { super(destination); if (windowCreationInterval !== null && windowCreationInterval >= 0) { let window = this.openWindow(); - this.add(scheduler.schedule(windowTimeSpan, { subscriber: this, window, context: null }, dispatchWindowClose)) - this.add(scheduler.schedule(windowCreationInterval, { windowTimeSpan, windowCreationInterval, subscriber: this, scheduler }, dispatchWindowCreation)) + this.add(scheduler.schedule(dispatchWindowClose, windowTimeSpan, { subscriber: this, window, context: null })) + this.add(scheduler.schedule(dispatchWindowCreation, windowCreationInterval, { windowTimeSpan, windowCreationInterval, subscriber: this, scheduler })) } else { let window = this.openWindow(); - this.add(scheduler.schedule(windowTimeSpan, { subscriber: this, window }, dispatchWindowTimeSpanOnly)); + this.add(scheduler.schedule(dispatchWindowTimeSpanOnly, windowTimeSpan, { subscriber: this, window })); } } @@ -96,7 +96,7 @@ function dispatchWindowCreation(state) { let window = subscriber.openWindow(); let action = >this; let context = { action, subscription: null }; - action.add(context.subscription = scheduler.schedule(windowTimeSpan, { subscriber, window, context }, dispatchWindowClose)); + action.add(context.subscription = scheduler.schedule(dispatchWindowClose, windowTimeSpan, { subscriber, window, context })); action.schedule(state); } diff --git a/src/schedulers/ImmediateScheduler.ts b/src/schedulers/ImmediateScheduler.ts index 82c033079c..b613d8e9b6 100644 --- a/src/schedulers/ImmediateScheduler.ts +++ b/src/schedulers/ImmediateScheduler.ts @@ -25,17 +25,17 @@ export default class ImmediateScheduler implements Scheduler { this.active = false; } - schedule(delay: number, state: any, work: (x?: any) => Subscription | void): Subscription { + schedule(work: (x?: any) => Subscription | void, delay: number = 0, state?: any): Subscription { return (delay <= 0) ? - this.scheduleNow(state, work) : - this.scheduleLater(state, work, delay); + this.scheduleNow(work, state) : + this.scheduleLater(work, delay, state); } - scheduleNow(state: any, work: (x?: any) => Subscription | void): Action { + scheduleNow(work: (x?: any) => Subscription | void, state?: any): Action { return new Action(this, work).schedule(state); } - scheduleLater(state: any, work: (x?: any) => Subscription | void, delay: number): Action { + scheduleLater(work: (x?: any) => Subscription | void, delay: number, state?: any): Action { return new FutureAction(this, work, delay).schedule(state); } } \ No newline at end of file diff --git a/src/schedulers/NextTickScheduler.ts b/src/schedulers/NextTickScheduler.ts index cfe01fc286..02a38cb6c8 100644 --- a/src/schedulers/NextTickScheduler.ts +++ b/src/schedulers/NextTickScheduler.ts @@ -5,7 +5,7 @@ import Action from './Action'; import NextTickAction from './NextTickAction'; export default class NextTickScheduler extends ImmediateScheduler { - scheduleNow(state: any, work: (x?: any) => Subscription | void): Action { + scheduleNow(work: (x?: any) => Subscription, state?: any): Action { return (this.scheduled ? new Action(this, work) : new NextTickAction(this, work)).schedule(state);