Skip to content

Commit

Permalink
Merge pull request #13 from ReactiveX/master
Browse files Browse the repository at this point in the history
fix(find): force unsubscribe when it completes or errors (ReactiveX#2550)
  • Loading branch information
GulajavaMinistudio authored May 17, 2017
2 parents 3615f06 + df78c4c commit f0a4391
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 0 deletions.
16 changes: 16 additions & 0 deletions spec/helpers/doNotUnsubscribe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
///<reference path='../../typings/index.d.ts'/>
import * as Rx from '../../dist/cjs/Rx';

export function doNotUnsubscribe<T>(ob: Rx.Observable<T>): Rx.Observable<T> {
return ob.lift(new DoNotUnsubscribeOperator());
}

class DoNotUnsubscribeOperator<T, R> implements Rx.Operator<T, R> {
call(subscriber: Rx.Subscriber<R>, source: any): any {
return source.subscribe(new DoNotUnsubscribeSubscriber(subscriber));
}
}

class DoNotUnsubscribeSubscriber<T> extends Rx.Subscriber<T> {
unsubscribe() {} // tslint:disable-line no-empty
}
28 changes: 28 additions & 0 deletions spec/operators/find-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';
import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports
import { doNotUnsubscribe } from '../helpers/doNotUnsubscribe';

declare const { asDiagram };
declare const hot: typeof marbleTestingSignature.hot;
Expand Down Expand Up @@ -162,6 +163,33 @@ describe('Observable.prototype.find', () => {
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should unsubscribe from source when complete, even if following operator does not unsubscribe', () => {
const values = {a: 3, b: 9, c: 15, d: 20};
const source = hot('---a--b--c--d---|', values);
const subs = '^ ! ';
const expected = '---------(c|) ';

const predicate = function (x) { return x % 5 === 0; };

expectObservable((<any>source).find(predicate).let(doNotUnsubscribe)).toBe(expected, values);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should unsubscribe from source when predicate function errors,' +
' even if followring operator does not unsubscribe', () => {

const source = hot('--a--b--c--|');
const subs = '^ !';
const expected = '--#';

const predicate = function (value) {
throw 'error';
};

expectObservable((<any>source).find(predicate).let(doNotUnsubscribe)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should support type guards without breaking previous behavior', () => {
// tslint:disable no-unused-variable

Expand Down
2 changes: 2 additions & 0 deletions src/operator/find.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ export class FindValueSubscriber<T> extends Subscriber<T> {

destination.next(value);
destination.complete();
this.unsubscribe();
}

protected _next(value: T): void {
Expand All @@ -97,6 +98,7 @@ export class FindValueSubscriber<T> extends Subscriber<T> {
}
} catch (err) {
this.destination.error(err);
this.unsubscribe();
}
}

Expand Down

0 comments on commit f0a4391

Please sign in to comment.