From 2bce0e39d51fb2e9bee3e2419e1b66a6c6e48e9e Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Sun, 17 May 2020 20:27:56 -0500 Subject: [PATCH] fix: errors thrown from iterables now properly propagated (#5444) * fix: errors thrown from iterables now properly propagated * chore: fix lint failures Co-authored-by: Nicholas Jamieson --- spec/operators/mergeMap-spec.ts | 43 ++++++++++++++++++------ spec/util/subscribeToResult-spec.ts | 22 ++++++++++++ src/internal/util/subscribeToIterable.ts | 11 ++++-- 3 files changed, 63 insertions(+), 13 deletions(-) diff --git a/spec/operators/mergeMap-spec.ts b/spec/operators/mergeMap-spec.ts index efe576c528..9342c7153b 100644 --- a/spec/operators/mergeMap-spec.ts +++ b/spec/operators/mergeMap-spec.ts @@ -1,14 +1,22 @@ import { expect } from 'chai'; -import { mergeMap, map } from 'rxjs/operators'; +import { mergeMap, map, delay } from 'rxjs/operators'; import { asapScheduler, defer, Observable, from, of, timer } from 'rxjs'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { asInteropObservable } from '../helpers/interop-helper'; +import { TestScheduler } from 'rxjs/testing'; +import { observableMatcher } from '../helpers/observableMatcher'; -declare const type: Function; declare const asDiagram: Function; /** @test {mergeMap} */ describe('mergeMap', () => { + let rxTest: TestScheduler; + + // TODO: Convert the rest of these tests to use run mode! + beforeEach(() => { + rxTest = new TestScheduler(observableMatcher); + }); + asDiagram('mergeMap(i => 10*i\u2014\u201410*i\u2014\u201410*i\u2014| )') ('should map-and-flatten each item to an Observable', () => { const e1 = hot('--1-----3--5-------|'); @@ -821,14 +829,27 @@ describe('mergeMap', () => { }, 0); }); - type('should support type signatures', () => { - let o: Observable; - - /* tslint:disable:no-unused-variable */ - let a1: Observable = o.pipe(mergeMap(x => x.toString())); - let a2: Observable = o.pipe(mergeMap(x => x.toString(), 3)); - let a3: Observable<{ o: number; i: string; }> = o.pipe(mergeMap(x => x.toString(), (o, i) => ({ o, i }))); - let a4: Observable<{ o: number; i: string; }> = o.pipe(mergeMap(x => x.toString(), (o, i) => ({ o, i }), 3)); - /* tslint:enable:no-unused-variable */ + // NOTE: From https://github.com/ReactiveX/rxjs/issues/5436 + it('should properly handle errors from iterables that are processed after some async', () => { + rxTest.run(({ cold, expectObservable }) => { + const noXError = new Error('we do not allow x'); + const source = cold('-----A------------B-----|', { A: ['o', 'o', 'o'], B: ['o', 'x', 'o']}); + const expected = ' -----(ooo)--------(o#)'; + const iterable = function* (data: string[]) { + for (let d of data) { + if (d === 'x') { + throw noXError; + } + yield d; + } + }; + const result = source.pipe( + mergeMap(x => of(x).pipe( + delay(0), + mergeMap(iterable) + )) + ); + expectObservable(result).toBe(expected, undefined, noXError); + }); }); }); diff --git a/spec/util/subscribeToResult-spec.ts b/spec/util/subscribeToResult-spec.ts index b3b0bfd929..d3ece09eb5 100644 --- a/spec/util/subscribeToResult-spec.ts +++ b/spec/util/subscribeToResult-spec.ts @@ -119,6 +119,28 @@ describe('subscribeToResult', () => { expect(expected).to.be.equal(42); }); + // NOTE: From https://github.com/ReactiveX/rxjs/issues/5436 + it('should pass along errors from an iterable', () => { + const generator = function* () { + yield 1; + yield 2; + yield 3; + throw 'bad'; + }; + + const results: any[] = []; + let foundError: any = null; + + const subscriber = new OuterSubscriber({ + next: x => results.push(x), + error: err => foundError = err + }); + + subscribeToResult(subscriber, generator()); + expect(results).to.deep.equal([1, 2, 3]); + expect(foundError).to.equal('bad'); + }); + it('should subscribe to to an object that implements Symbol.observable', (done) => { const observableSymbolObject = { [$$symbolObservable]: () => of(42) }; diff --git a/src/internal/util/subscribeToIterable.ts b/src/internal/util/subscribeToIterable.ts index 6d20f689ff..b34c81b4e5 100644 --- a/src/internal/util/subscribeToIterable.ts +++ b/src/internal/util/subscribeToIterable.ts @@ -2,9 +2,16 @@ import { Subscriber } from '../Subscriber'; import { iterator as Symbol_iterator } from '../symbol/iterator'; export const subscribeToIterable = (iterable: Iterable) => (subscriber: Subscriber) => { - const iterator = iterable[Symbol_iterator](); + const iterator = (iterable as any)[Symbol_iterator](); + do { - const item = iterator.next(); + let item: IteratorResult; + try { + item = iterator.next(); + } catch (err) { + subscriber.error(err); + return; + } if (item.done) { subscriber.complete(); break;