130 lines
6.2 KiB
JavaScript
Executable File
130 lines
6.2 KiB
JavaScript
Executable File
import { isScheduler } from '../util/isScheduler';
|
|
import { isArray } from '../util/isArray';
|
|
import { ArrayObservable } from './ArrayObservable';
|
|
import { CombineLatestOperator } from '../operators/combineLatest';
|
|
/* tslint:enable:max-line-length */
|
|
/**
|
|
* Combines multiple Observables to create an Observable whose values are
|
|
* calculated from the latest values of each of its input Observables.
|
|
*
|
|
* <span class="informal">Whenever any input Observable emits a value, it
|
|
* computes a formula using the latest values from all the inputs, then emits
|
|
* the output of that formula.</span>
|
|
*
|
|
* <img src="./img/combineLatest.png" width="100%">
|
|
*
|
|
* `combineLatest` combines the values from all the Observables passed as
|
|
* arguments. This is done by subscribing to each Observable in order and,
|
|
* whenever any Observable emits, collecting an array of the most recent
|
|
* values from each Observable. So if you pass `n` Observables to operator,
|
|
* returned Observable will always emit an array of `n` values, in order
|
|
* corresponding to order of passed Observables (value from the first Observable
|
|
* on the first place and so on).
|
|
*
|
|
* Static version of `combineLatest` accepts either an array of Observables
|
|
* or each Observable can be put directly as an argument. Note that array of
|
|
* Observables is good choice, if you don't know beforehand how many Observables
|
|
* you will combine. Passing empty array will result in Observable that
|
|
* completes immediately.
|
|
*
|
|
* To ensure output array has always the same length, `combineLatest` will
|
|
* actually wait for all input Observables to emit at least once,
|
|
* before it starts emitting results. This means if some Observable emits
|
|
* values before other Observables started emitting, all that values but last
|
|
* will be lost. On the other hand, is some Observable does not emit value but
|
|
* completes, resulting Observable will complete at the same moment without
|
|
* emitting anything, since it will be now impossible to include value from
|
|
* completed Observable in resulting array. Also, if some input Observable does
|
|
* not emit any value and never completes, `combineLatest` will also never emit
|
|
* and never complete, since, again, it will wait for all streams to emit some
|
|
* value.
|
|
*
|
|
* If at least one Observable was passed to `combineLatest` and all passed Observables
|
|
* emitted something, resulting Observable will complete when all combined
|
|
* streams complete. So even if some Observable completes, result of
|
|
* `combineLatest` will still emit values when other Observables do. In case
|
|
* of completed Observable, its value from now on will always be the last
|
|
* emitted value. On the other hand, if any Observable errors, `combineLatest`
|
|
* will error immediately as well, and all other Observables will be unsubscribed.
|
|
*
|
|
* `combineLatest` accepts as optional parameter `project` function, which takes
|
|
* as arguments all values that would normally be emitted by resulting Observable.
|
|
* `project` can return any kind of value, which will be then emitted by Observable
|
|
* instead of default array. Note that `project` does not take as argument that array
|
|
* of values, but values themselves. That means default `project` can be imagined
|
|
* as function that takes all its arguments and puts them into an array.
|
|
*
|
|
*
|
|
* @example <caption>Combine two timer Observables</caption>
|
|
* const firstTimer = Rx.Observable.timer(0, 1000); // emit 0, 1, 2... after every second, starting from now
|
|
* const secondTimer = Rx.Observable.timer(500, 1000); // emit 0, 1, 2... after every second, starting 0,5s from now
|
|
* const combinedTimers = Rx.Observable.combineLatest(firstTimer, secondTimer);
|
|
* combinedTimers.subscribe(value => console.log(value));
|
|
* // Logs
|
|
* // [0, 0] after 0.5s
|
|
* // [1, 0] after 1s
|
|
* // [1, 1] after 1.5s
|
|
* // [2, 1] after 2s
|
|
*
|
|
*
|
|
* @example <caption>Combine an array of Observables</caption>
|
|
* const observables = [1, 5, 10].map(
|
|
* n => Rx.Observable.of(n).delay(n * 1000).startWith(0) // emit 0 and then emit n after n seconds
|
|
* );
|
|
* const combined = Rx.Observable.combineLatest(observables);
|
|
* combined.subscribe(value => console.log(value));
|
|
* // Logs
|
|
* // [0, 0, 0] immediately
|
|
* // [1, 0, 0] after 1s
|
|
* // [1, 5, 0] after 5s
|
|
* // [1, 5, 10] after 10s
|
|
*
|
|
*
|
|
* @example <caption>Use project function to dynamically calculate the Body-Mass Index</caption>
|
|
* var weight = Rx.Observable.of(70, 72, 76, 79, 75);
|
|
* var height = Rx.Observable.of(1.76, 1.77, 1.78);
|
|
* var bmi = Rx.Observable.combineLatest(weight, height, (w, h) => w / (h * h));
|
|
* bmi.subscribe(x => console.log('BMI is ' + x));
|
|
*
|
|
* // With output to console:
|
|
* // BMI is 24.212293388429753
|
|
* // BMI is 23.93948099205209
|
|
* // BMI is 23.671253629592222
|
|
*
|
|
*
|
|
* @see {@link combineAll}
|
|
* @see {@link merge}
|
|
* @see {@link withLatestFrom}
|
|
*
|
|
* @param {ObservableInput} observable1 An input Observable to combine with other Observables.
|
|
* @param {ObservableInput} observable2 An input Observable to combine with other Observables.
|
|
* More than one input Observables may be given as arguments
|
|
* or an array of Observables may be given as the first argument.
|
|
* @param {function} [project] An optional function to project the values from
|
|
* the combined latest values into a new value on the output Observable.
|
|
* @param {Scheduler} [scheduler=null] The IScheduler to use for subscribing to
|
|
* each input Observable.
|
|
* @return {Observable} An Observable of projected values from the most recent
|
|
* values from each input Observable, or an array of the most recent values from
|
|
* each input Observable.
|
|
* @static true
|
|
* @name combineLatest
|
|
* @owner Observable
|
|
*/
|
|
export function combineLatest(...observables) {
|
|
let project = null;
|
|
let scheduler = null;
|
|
if (isScheduler(observables[observables.length - 1])) {
|
|
scheduler = observables.pop();
|
|
}
|
|
if (typeof observables[observables.length - 1] === 'function') {
|
|
project = observables.pop();
|
|
}
|
|
// if the first and only other argument besides the resultSelector is an array
|
|
// assume it's been called with `combineLatest([obs1, obs2, obs3], project)`
|
|
if (observables.length === 1 && isArray(observables[0])) {
|
|
observables = observables[0];
|
|
}
|
|
return new ArrayObservable(observables, scheduler).lift(new CombineLatestOperator(project));
|
|
}
|
|
//# sourceMappingURL=combineLatest.js.map
|