From dc67d21915d4499914b1080b896bfda74b46dcc4 Mon Sep 17 00:00:00 2001 From: Paul Taylor Date: Mon, 25 Jan 2016 14:26:18 -0800 Subject: [PATCH] fix(Subscriber): adds unsubscription when errors are thrown from user-land handlers. - slightly improves perf by improving shape of SafeSubscriber so JIT can optimize call patterns. related #1186 --- spec/Observable-spec.js | 122 +++++++++++++++++++++++++++++++++--- src/Observable.ts | 2 +- src/Subscriber.ts | 89 +++++++++++++++++++------- src/util/toSubscriber.ts | 4 +- src/util/tryOrThrowError.ts | 11 ---- 5 files changed, 181 insertions(+), 47 deletions(-) delete mode 100644 src/util/tryOrThrowError.ts diff --git a/spec/Observable-spec.js b/spec/Observable-spec.js index c30719a9c4..0b7a79b973 100644 --- a/spec/Observable-spec.js +++ b/spec/Observable-spec.js @@ -1,6 +1,7 @@ /* globals describe, it, expect */ var Rx = require('../dist/cjs/Rx'); var Promise = require('promise'); +var Subscriber = Rx.Subscriber; var Observable = Rx.Observable; function expectFullObserver(val) { @@ -118,6 +119,15 @@ describe('Observable', function () { expect(mutatedByComplete).toBe(true); }); + it('should work when subscribe is called with no arguments', function () { + var source = new Observable(function (subscriber) { + subscriber.next('foo'); + subscriber.complete(); + }); + + source.subscribe(); + }); + it('should return a Subscription that calls the unsubscribe function returned by the subscriber', function () { var unsubscribeCalled = false; @@ -136,29 +146,89 @@ describe('Observable', function () { expect(unsubscribeCalled).toBe(true); }); + it('should not run unsubscription logic when an error is thrown sending messages synchronously', function () { + var messageError = false; + var messageErrorValue = false; + var unsubscribeCalled = false; + + var sub; + var source = new Observable(function (observer) { + observer.next('boo!'); + return function () { + unsubscribeCalled = true; + }; + }); + + try { + sub = source.subscribe(function (x) { throw x; }); + } catch (e) { + messageError = true; + messageErrorValue = e; + } + + expect(sub).toBe(undefined); + expect(unsubscribeCalled).toBe(false); + expect(messageError).toBe(true); + expect(messageErrorValue).toBe('boo!'); + }); + + it('should dispose of the subscriber when an error is thrown sending messages synchronously', function () { + var messageError = false; + var messageErrorValue = false; + var unsubscribeCalled = false; + + var sub; + var subscriber = new Subscriber(function (x) { throw x; }); + var source = new Observable(function (observer) { + observer.next('boo!'); + return function () { + unsubscribeCalled = true; + }; + }); + + try { + sub = source.subscribe(subscriber); + } catch (e) { + messageError = true; + messageErrorValue = e; + } + + expect(sub).toBe(undefined); + expect(subscriber.isUnsubscribed).toBe(true); + expect(unsubscribeCalled).toBe(false); + expect(messageError).toBe(true); + expect(messageErrorValue).toBe('boo!'); + }); + describe('when called with an anonymous observer', function () { - it('should accept an anonymous observer with just a next function', function () { - Observable.of(1).subscribe({ + it('should accept an anonymous observer with just a next function and call the next function in the context of the anonymous observer', function () { + var o = { next: function next(x) { + expect(this).toBe(o); expect(x).toBe(1); } - }); + }; + Observable.of(1).subscribe(o); }); - it('should accept an anonymous observer with just an error function', function () { - Observable.throw('bad').subscribe({ + it('should accept an anonymous observer with just an error function and call the error function in the context of the anonymous observer', function () { + var o = { error: function error(err) { + expect(this).toBe(o); expect(err).toBe('bad'); } - }); + }; + Observable.throw('bad').subscribe(o); }); - it('should accept an anonymous observer with just a complete function', function (done) { - Observable.empty().subscribe({ + it('should accept an anonymous observer with just a complete function and call the complete function in the context of the anonymous observer', function (done) { + var o = { complete: function complete() { + expect(this).toBe(o); done(); } - }); + }; + Observable.empty().subscribe(o); }); it('should accept an anonymous observer with no functions at all', function () { @@ -166,6 +236,38 @@ describe('Observable', function () { Observable.empty().subscribe({}); }).not.toThrow(); }); + + it('should not run unsubscription logic when an error is thrown sending messages synchronously to an anonymous observer', function () { + var messageError = false; + var messageErrorValue = false; + var unsubscribeCalled = false; + + var o = { + next: function next(x) { + expect(this).toBe(o); + throw x; + } + }; + var sub; + var source = new Observable(function (observer) { + observer.next('boo!'); + return function () { + unsubscribeCalled = true; + }; + }); + + try { + sub = source.subscribe(o); + } catch (e) { + messageError = true; + messageErrorValue = e; + } + + expect(sub).toBe(undefined); + expect(unsubscribeCalled).toBe(false); + expect(messageError).toBe(true); + expect(messageErrorValue).toBe('boo!'); + }); }); }); }); @@ -188,4 +290,4 @@ describe('Observable.create', function () { result.subscribe(function () { }); expect(called).toBe(true); }); -}); \ No newline at end of file +}); diff --git a/src/Observable.ts b/src/Observable.ts index 9cb5336df1..f08c6f8a52 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -113,7 +113,7 @@ export class Observable implements CoreOperators { const subscriber = toSubscriber(observerOrNext, error, complete); if (operator) { - subscriber.add(this._subscribe(this.operator.call(subscriber))); + subscriber.add(this._subscribe(operator.call(subscriber))); } else { subscriber.add(this._subscribe(subscriber)); } diff --git a/src/Subscriber.ts b/src/Subscriber.ts index 955b30a55b..461cee0dc6 100644 --- a/src/Subscriber.ts +++ b/src/Subscriber.ts @@ -1,6 +1,6 @@ -import {noop} from './util/noop'; -import {throwError} from './util/throwError'; -import {tryOrThrowError} from './util/tryOrThrowError'; +import {isFunction} from './util/isFunction'; +import {tryCatch} from './util/tryCatch'; +import {errorObject} from './util/errorObject'; import {Observer} from './Observer'; import {Subscription} from './Subscription'; @@ -12,26 +12,38 @@ export class Subscriber extends Subscription implements Observer { static create(next?: (x?: T) => void, error?: (e?: any) => void, complete?: () => void): Subscriber { - return new SafeSubscriber(next, error, complete); + return new Subscriber(next, error, complete); } protected isStopped: boolean = false; protected destination: Observer; - constructor(destination: Observer = emptyObserver) { + constructor(destinationOrNext?: Observer | ((value: T) => void), + error?: (e?: any) => void, + complete?: () => void) { super(); - this.destination = destination; - - if (!destination || - (destination instanceof Subscriber) || - (destination === emptyObserver)) { - return; + switch (arguments.length) { + case 0: + this.destination = emptyObserver; + break; + case 1: + if (!destinationOrNext) { + this.destination = emptyObserver; + break; + } + if (typeof destinationOrNext === 'object') { + if (destinationOrNext instanceof Subscriber) { + this.destination = (> destinationOrNext); + } else { + this.destination = new SafeSubscriber(this, > destinationOrNext); + } + break; + } + default: + this.destination = new SafeSubscriber(this, <((value: T) => void)> destinationOrNext, error, complete); + break; } - - if (typeof destination.next !== 'function') { destination.next = noop; } - if (typeof destination.error !== 'function') { destination.error = throwError; } - if (typeof destination.complete !== 'function') { destination.complete = noop; } } next(value?: T): void { @@ -83,25 +95,48 @@ export class Subscriber extends Subscription implements Observer { class SafeSubscriber extends Subscriber { - constructor(next?: (x?: T) => void, + private _context: any; + + constructor(private _parent: Subscriber, + observerOrNext?: Observer | ((value: T) => void), error?: (e?: any) => void, complete?: () => void) { super(); - this._next = (typeof next === 'function') && tryOrThrowError(next) || null; - this._error = (typeof error === 'function') && tryOrThrowError(error) || throwError; - this._complete = (typeof complete === 'function') && tryOrThrowError(complete) || null; + + let next: ((value: T) => void); + let context: any = this; + + if (isFunction(observerOrNext)) { + next = (<((value: T) => void)> observerOrNext); + } else if (observerOrNext) { + context = observerOrNext; + next = (> observerOrNext).next; + error = (> observerOrNext).error; + complete = (> observerOrNext).complete; + } + + this._context = context; + this._next = next; + this._error = error; + this._complete = complete; } next(value?: T): void { if (!this.isStopped && this._next) { - this._next(value); + if (tryCatch(this._next).call(this._context, value) === errorObject) { + this.unsubscribe(); + throw errorObject.e; + } } } error(err?: any): void { if (!this.isStopped) { if (this._error) { - this._error(err); + if (tryCatch(this._error).call(this._context, err) === errorObject) { + this.unsubscribe(); + throw errorObject.e; + } } this.unsubscribe(); } @@ -110,9 +145,19 @@ class SafeSubscriber extends Subscriber { complete(): void { if (!this.isStopped) { if (this._complete) { - this._complete(); + if (tryCatch(this._complete).call(this._context) === errorObject) { + this.unsubscribe(); + throw errorObject.e; + } } this.unsubscribe(); } } + + protected _unsubscribe(): void { + const { _parent } = this; + this._context = null; + this._parent = null; + _parent.unsubscribe(); + } } diff --git a/src/util/toSubscriber.ts b/src/util/toSubscriber.ts index f20dcc3495..72c8316f87 100644 --- a/src/util/toSubscriber.ts +++ b/src/util/toSubscriber.ts @@ -12,10 +12,8 @@ export function toSubscriber( return (> next); } else if (typeof next[rxSubscriber] === 'function') { return next[rxSubscriber](); - } else { - return new Subscriber(> next); } } - return Subscriber.create(<((value: T) => void)> next, error, complete); + return new Subscriber(next, error, complete); } diff --git a/src/util/tryOrThrowError.ts b/src/util/tryOrThrowError.ts deleted file mode 100644 index 4bd4038868..0000000000 --- a/src/util/tryOrThrowError.ts +++ /dev/null @@ -1,11 +0,0 @@ -export function tryOrThrowError(target: Function): (x?: any) => any { - function tryCatcher() { - try { - ( tryCatcher).target.apply(this, arguments); - } catch (e) { - throw e; - } - } - ( tryCatcher).target = target; - return tryCatcher; -}