Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(combineLatest): accept array of observable as parameter #759

Merged
merged 1 commit into from
Nov 22, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion spec/observables/combineLatest-spec.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* globals describe, it, expect, hot, cold, expectObservable */
/* globals describe, it, expect, hot, cold, expectObservable, expectSubscriptions */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;
var immediateScheduler = Rx.Scheduler.immediate;
Expand Down Expand Up @@ -29,6 +29,18 @@ describe('Observable.combineLatest', function () {
});
});

it('should accept array of observables', function () {
var firstSource = hot('----a----b----c----|');
var secondSource = hot('--d--e--f--g--|');
var expected = '----uv--wx-y--z----|';

var combined = Observable.combineLatest([firstSource, secondSource], function (a, b) {
return '' + a + b;
});

expectObservable(combined).toBe(expected, {u: 'ad', v: 'ae', w: 'af', x: 'bf', y: 'bg', z: 'cg'});
});

it('should work with two nevers', function () {
var e1 = cold( '-');
var e1subs = '^';
Expand Down
19 changes: 18 additions & 1 deletion spec/operators/combineLatest-spec.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* globals describe, it, expect, hot, cold, expectObservable */
/* globals describe, it, expect, hot, cold, expectObservable, expectSubscriptions */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;
var immediateScheduler = Rx.Scheduler.immediate;
Expand Down Expand Up @@ -145,6 +145,23 @@ describe('Observable.prototype.combineLatest', function () {
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should accept array of observables', function () {
var e1 = hot('--a--^--b--c--|');
var e1subs = '^ !';
var e2 = hot('---e-^---f--g--|');
var e2subs = '^ !';
var e3 = hot('---h-^----i--j-|');
var e3subs = '^ !';
var expected = '-----wxyz-|';

var result = e1.combineLatest([e2, e3], function (x, y, z) { return x + y + z; });

expectObservable(result).toBe(expected, { w: 'bfi', x: 'cfi', y: 'cgi', z: 'cgj' });
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
expectSubscriptions(e3.subscriptions).toBe(e3subs);
});

it('should work with empty and error', function () {
var e1 = hot('----------|'); //empty
var e1subs = '^ !';
Expand Down
9 changes: 7 additions & 2 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,10 @@ export class Observable<T> implements CoreOperators<T> {
}

// static method stubs
static combineLatest: <T>(...observables: Array<Observable<any> | ((...values: Array<any>) => T) | Scheduler>) => Observable<T>;
static combineLatest: <T>(...observables: Array<Observable<any> |
Array<Observable<any>> |
((...values: Array<any>) => T) |
Scheduler>) => Observable<T>;
static concat: <T>(...observables: Array<Observable<any> | Scheduler>) => Observable<T>;
static defer: <T>(observableFactory: () => Observable<T>) => Observable<T>;
static empty: <T>(scheduler?: Scheduler) => Observable<T>;
Expand Down Expand Up @@ -166,7 +169,9 @@ export class Observable<T> implements CoreOperators<T> {
bufferWhen: (closingSelector: () => Observable<any>) => Observable<T[]>;
catch: (selector: (err: any, source: Observable<T>, caught: Observable<any>) => Observable<any>) => Observable<T>;
combineAll: <R>(project?: (...values: Array<any>) => R) => Observable<R>;
combineLatest: <R>(...observables: Array<Observable<any> | ((...values: Array<any>) => R)>) => Observable<R>;
combineLatest: <R>(...observables: Array<Observable<any> |
Array<Observable<any>> |
((...values: Array<any>) => R)>) => Observable<R>;
concat: <R>(...observables: (Observable<any> | Scheduler)[]) => Observable<R>;
concatAll: () => Observable<any>;
concatMap: <R>(project: ((x: T, ix: number) => Observable<any>), projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
Expand Down
19 changes: 15 additions & 4 deletions src/operators/combineLatest-static.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {ArrayObservable} from '../observables/ArrayObservable';
import {CombineLatestOperator} from './combineLatest-support';
import {Scheduler} from '../Scheduler';
import {isScheduler} from '../util/isScheduler';
import {isArray} from '../util/isArray';

/**
* Combines the values from observables passed as arguments. This is done by subscribing
Expand All @@ -14,15 +15,25 @@ import {isScheduler} from '../util/isScheduler';
* @returns {Observable} an observable of other projected values from the most recent values from each observable, or an array of each of
* the most recent values from each observable.
*/
export function combineLatest<R>(...observables: Array<Observable<any> | ((...values: Array<any>) => R) | Scheduler>): Observable<R> {
let project, scheduler;
export function combineLatest<R>(...observables: Array<Observable<any> |
Array<Observable<any>> |
((...values: Array<any>) => R) |
Scheduler>): Observable<R> {
let project: (...values: Array<any>) => R = null;
let scheduler: Scheduler = null;

if (isScheduler(observables[observables.length - 1])) {
scheduler = observables.pop();
scheduler = <Scheduler>observables.pop();
}

if (typeof observables[observables.length - 1] === 'function') {
project = observables.pop();
project = <(...values: Array<any>) => R>observables.pop();
}

// if the first and only other argument besides the resultSelector is an array
// assume it's been called with `combineLatest([obs1, obs2, obs3], project)`
if (observables.length === 1 && isArray(observables[0])) {
observables = <Array<Observable<any>>>observables[0];
}

return new ArrayObservable(observables, scheduler).lift(new CombineLatestOperator(project));
Expand Down
19 changes: 15 additions & 4 deletions src/operators/combineLatest.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {Observable} from '../Observable';
import {ArrayObservable} from '../observables/ArrayObservable';
import {CombineLatestOperator} from './combineLatest-support';
import {isArray} from '../util/isArray';

/**
* Combines the values from this observable with values from observables passed as arguments. This is done by subscribing
Expand All @@ -12,11 +13,21 @@ import {CombineLatestOperator} from './combineLatest-support';
* @returns {Observable} an observable of other projected values from the most recent values from each observable, or an array of each of
* the most recent values from each observable.
*/
export function combineLatest<R>(...observables: Array<Observable<any> | ((...values: Array<any>) => R)>): Observable<R> {
observables.unshift(this);
let project;
export function combineLatest<R>(...observables: Array<Observable<any> |
Array<Observable<any>> |
((...values: Array<any>) => R)>): Observable<R> {
let project: (...values: Array<any>) => R = null;
if (typeof observables[observables.length - 1] === 'function') {
project = observables.pop();
project = <(...values: Array<any>) => R>observables.pop();
}

// if the first and only other argument besides the resultSelector is an array
// assume it's been called with `combineLatest([obs1, obs2, obs3], project)`
if (observables.length === 1 && isArray(observables[0])) {
observables = <Array<Observable<any>>>observables[0];
}

observables.unshift(this);

return new ArrayObservable(observables).lift(new CombineLatestOperator(project));
}