From c65a098e889762b0de1dcb61958345c4d2f143c1 Mon Sep 17 00:00:00 2001 From: Mateusz Podlasin Date: Sun, 29 Jan 2017 08:22:00 +0100 Subject: [PATCH] fix(repeatWhen): resulting observable will wait for the source to complete, even if a hot notifier completes first. (#2209) After notifier completes wait for source observable to complete instead of ending stream immediately Closes #2054 --- spec/operators/repeatWhen-spec.ts | 6 +-- src/operator/repeatWhen.ts | 68 ++++++++++++++++--------------- 2 files changed, 39 insertions(+), 35 deletions(-) diff --git a/spec/operators/repeatWhen-spec.ts b/spec/operators/repeatWhen-spec.ts index 00eb77ffdc..0df4a6672b 100644 --- a/spec/operators/repeatWhen-spec.ts +++ b/spec/operators/repeatWhen-spec.ts @@ -10,9 +10,9 @@ describe('Observable.prototype.repeatWhen', () => { const source = cold('-1--2--|'); const subs = ['^ ! ', ' ^ ! ', - ' ^ !']; + ' ^ !']; const notifier = hot('-------------r------------r-|'); - const expected = '-1--2---------1--2---------1|'; + const expected = '-1--2---------1--2---------1--2--|'; const result = source.repeatWhen((notifications: any) => notifier); @@ -320,4 +320,4 @@ describe('Observable.prototype.repeatWhen', () => { expectObservable(result).toBe(expected); expectSubscriptions(source.subscriptions).toBe(subs); }); -}); \ No newline at end of file +}); diff --git a/src/operator/repeatWhen.ts b/src/operator/repeatWhen.ts index a49639727e..31d4e7e349 100644 --- a/src/operator/repeatWhen.ts +++ b/src/operator/repeatWhen.ts @@ -27,16 +27,15 @@ import { subscribeToResult } from '../util/subscribeToResult'; * @owner Observable */ export function repeatWhen(this: Observable, notifier: (notifications: Observable) => Observable): Observable { - return this.lift(new RepeatWhenOperator(notifier, this)); + return this.lift(new RepeatWhenOperator(notifier)); } class RepeatWhenOperator implements Operator { - constructor(protected notifier: (notifications: Observable) => Observable, - protected source: Observable) { + constructor(protected notifier: (notifications: Observable) => Observable) { } call(subscriber: Subscriber, source: any): TeardownLogic { - return source.subscribe(new RepeatWhenSubscriber(subscriber, this.notifier, this.source)); + return source.subscribe(new RepeatWhenSubscriber(subscriber, this.notifier, source)); } } @@ -50,6 +49,7 @@ class RepeatWhenSubscriber extends OuterSubscriber { private notifications: Subject; private retries: Observable; private retriesSubscription: Subscription; + private sourceIsBeingSubscribedTo: boolean = true; constructor(destination: Subscriber, private notifier: (notifications: Observable) => Observable, @@ -57,33 +57,31 @@ class RepeatWhenSubscriber extends OuterSubscriber { super(destination); } - complete() { - if (!this.isStopped) { + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { + this.source.subscribe(this); + this.sourceIsBeingSubscribedTo = true; + } - let notifications = this.notifications; - let retries: any = this.retries; - let retriesSubscription = this.retriesSubscription; - - if (!retries) { - notifications = new Subject(); - retries = tryCatch(this.notifier)(notifications); - if (retries === errorObject) { - return super.complete(); - } - retriesSubscription = subscribeToResult(this, retries); - } else { - this.notifications = null; - this.retriesSubscription = null; - } + notifyComplete(innerSub: InnerSubscriber): void { + if (this.sourceIsBeingSubscribedTo === false) { + return super.complete(); + } + } - this.unsubscribe(); - this.closed = false; + complete() { + this.sourceIsBeingSubscribedTo = false; - this.notifications = notifications; - this.retries = retries; - this.retriesSubscription = retriesSubscription; + if (!this.isStopped) { + if (!this.retries) { + this.subscribeToRetries(); + } else if (this.retriesSubscription.closed) { + return super.complete(); + } - notifications.next(); + this.temporarilyUnsubscribe(); + this.notifications.next(); } } @@ -100,10 +98,17 @@ class RepeatWhenSubscriber extends OuterSubscriber { this.retries = null; } - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { + private subscribeToRetries() { + this.notifications = new Subject(); + const retries = tryCatch(this.notifier)(this.notifications); + if (retries === errorObject) { + return super.complete(); + } + this.retries = retries; + this.retriesSubscription = subscribeToResult(this, retries); + } + private temporarilyUnsubscribe() { const { notifications, retries, retriesSubscription } = this; this.notifications = null; this.retries = null; @@ -116,7 +121,6 @@ class RepeatWhenSubscriber extends OuterSubscriber { this.notifications = notifications; this.retries = retries; this.retriesSubscription = retriesSubscription; - - this.source.subscribe(this); } + }