From 3f7b8c54a7953bf4b403705d98b5c5bc072fbe6c Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Tue, 24 Nov 2015 10:06:58 -0200 Subject: [PATCH] fix(catch): fix catch to dispose old subscriptions Fix catch operator to not have anymore a shared underlying Subscription, and instead reset the subscription for each new observable replacing the caught error. This fixes a potential memory leak if catch is used as an infinite retry, because subscriptions would be retained since the beginning, and would increasing each time a catch is performed. Resolves issue #763. --- spec/operators/catch-spec.js | 57 +++++++++++++++++++++++------------- src/operators/catch.ts | 36 +++++++++++++++-------- 2 files changed, 60 insertions(+), 33 deletions(-) diff --git a/spec/operators/catch-spec.js b/spec/operators/catch-spec.js index 4c96a904756..4610adf1add 100644 --- a/spec/operators/catch-spec.js +++ b/spec/operators/catch-spec.js @@ -25,7 +25,7 @@ describe('Observable.prototype.catch()', function () { it('should catch error and replace it with a cold Observable', function () { var e1 = hot('--a--b--#----| '); - var e1subs = '^ !'; + var e1subs = '^ ! '; var e2 = cold( '1-2-3-4-5-|'); var e2subs = ' ^ !'; var expected = '--a--b--1-2-3-4-5-|'; @@ -37,9 +37,23 @@ describe('Observable.prototype.catch()', function () { expectSubscriptions(e2.subscriptions).toBe(e2subs); }); + it('should allow unsubscribing explicitly and early', function () { + var e1 = hot('--1-2-3-4-5-6---#'); + var unsub = ' ! '; + var e1subs = '^ ! '; + var expected = '--1-2-3- '; + + var result = e1.catch(function () { + return Observable.of('X', 'Y', 'Z'); + }); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + it('should catch error and replace it with a hot Observable', function () { var e1 = hot('--a--b--#----| '); - var e1subs = '^ !'; + var e1subs = '^ ! '; var e2 = hot('1-2-3-4-5-6-7-8-9-|'); var e2subs = ' ^ !'; var expected = '--a--b--5-6-7-8-9-|'; @@ -54,8 +68,8 @@ describe('Observable.prototype.catch()', function () { it('should catch and allow the cold observable to be repeated with the third ' + '(caught) argument', function () { var e1 = cold('--a--b--c--------| '); - var subs = ['^ !', - ' ^ !', + var subs = ['^ ! ', + ' ^ ! ', ' ^ !']; var expected = '--a--b----a--b----a--b--#'; @@ -81,7 +95,7 @@ describe('Observable.prototype.catch()', function () { it('should catch and allow the hot observable to proceed with the third ' + '(caught) argument', function () { var e1 = hot('--a--b--c----d---|'); - var subs = ['^ !', + var subs = ['^ ! ', ' ^ !']; var expected = '--a--b-------d---|'; @@ -132,41 +146,44 @@ describe('Observable.prototype.catch()', function () { it('should complete if you return Observable.empty()', function () { var e1 = hot('--a--b--#'); - var subs = '^ !'; + var e1subs = '^ !'; + var e2 = cold( '|'); + var e2subs = ' (^!)'; var expected = '--a--b--|'; - var result = e1.catch(function (err) { - return Observable.empty(); - }); + var result = e1.catch(function () { return e2; }); expectObservable(result).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); }); it('should raise error if you return Observable.throw()', function () { var e1 = hot('--a--b--#'); - var subs = '^ !'; + var e1subs = '^ !'; + var e2 = cold( '#'); + var e2subs = ' (^!)'; var expected = '--a--b--#'; - var result = e1.catch(function (err) { - return Observable.throw('error'); - }); + var result = e1.catch(function () { return e2; }); expectObservable(result).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); }); it('should never terminate if you return Observable.never()', function () { var e1 = hot('--a--b--#'); - var subs = '^ '; + var e1subs = '^ !'; + var e2 = cold( '-'); + var e2subs = ' ^'; var expected = '--a--b---'; - var result = e1.catch(function (err) { - return Observable.never(); - }); + var result = e1.catch(function () { return e2; }); expectObservable(result).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); }); it('should pass the error as the first argument', function (done) { diff --git a/src/operators/catch.ts b/src/operators/catch.ts index 65cb83864a4..44cbe71ee7f 100644 --- a/src/operators/catch.ts +++ b/src/operators/catch.ts @@ -1,6 +1,7 @@ import {Operator} from '../Operator'; import {Subscriber} from '../Subscriber'; import {Observable} from '../Observable'; +import {Subscription} from '../Subscription'; import {tryCatch} from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; @@ -20,12 +21,9 @@ export function _catch(selector: (err: any, caught: Observable) => Obser } class CatchOperator implements Operator { - selector: (err: any, caught: Observable) => Observable; caught: Observable; - source: Observable; - constructor(selector: (err: any, caught: Observable) => Observable) { - this.selector = selector; + constructor(private selector: (err: any, caught: Observable) => Observable) { } call(subscriber: Subscriber): Subscriber { @@ -34,15 +32,17 @@ class CatchOperator implements Operator { } class CatchSubscriber extends Subscriber { - selector: (err: any, caught: Observable) => Observable; - caught: Observable; + private lastSubscription: Subscription; + + constructor(public destination: Subscriber, + private selector: (err: any, caught: Observable) => Observable, + private caught: Observable) { + super(null); + this.lastSubscription = this; + } - constructor(destination: Subscriber, - selector: (err: any, caught: Observable) => Observable, - caught: Observable) { - super(destination); - this.selector = selector; - this.caught = caught; + _next(value: T) { + this.destination.next(value); } _error(err) { @@ -50,7 +50,17 @@ class CatchSubscriber extends Subscriber { if (result === errorObject) { this.destination.error(errorObject.e); } else { - this.add(result.subscribe(this.destination)); + this.lastSubscription.unsubscribe(); + this.lastSubscription = result.subscribe(this.destination); } } + + _complete() { + this.lastSubscription.unsubscribe(); + this.destination.complete(); + } + + _unsubscribe() { + this.lastSubscription.unsubscribe(); + } }