Repositorio del curso CCOM4030 el semestre B91 del proyecto Artesanías con el Instituto de Cultura

combineLatest.ts 22KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. import { Observable } from '../Observable';
  2. import { ObservableInput, SchedulerLike, ObservedValueOf } from '../types';
  3. import { isScheduler } from '../util/isScheduler';
  4. import { isArray } from '../util/isArray';
  5. import { Subscriber } from '../Subscriber';
  6. import { OuterSubscriber } from '../OuterSubscriber';
  7. import { Operator } from '../Operator';
  8. import { InnerSubscriber } from '../InnerSubscriber';
  9. import { subscribeToResult } from '../util/subscribeToResult';
  10. import { fromArray } from './fromArray';
  11. const NONE = {};
  12. /* tslint:disable:max-line-length */
  13. // If called with a single array, it "auto-spreads" the array, with result selector
  14. /** @deprecated resultSelector no longer supported, pipe to map instead */
  15. export function combineLatest<O1 extends ObservableInput<any>, R>(sources: [O1], resultSelector: (v1: ObservedValueOf<O1>) => R, scheduler?: SchedulerLike): Observable<R>;
  16. /** @deprecated resultSelector no longer supported, pipe to map instead */
  17. export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, R>(sources: [O1, O2], resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>) => R, scheduler?: SchedulerLike): Observable<R>;
  18. /** @deprecated resultSelector no longer supported, pipe to map instead */
  19. export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, R>(sources: [O1, O2, O3], resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>, v3: ObservedValueOf<O3>) => R, scheduler?: SchedulerLike): Observable<R>;
  20. /** @deprecated resultSelector no longer supported, pipe to map instead */
  21. export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, R>(sources: [O1, O2, O3, O4], resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>, v3: ObservedValueOf<O3>, v4: ObservedValueOf<O4>) => R, scheduler?: SchedulerLike): Observable<R>;
  22. /** @deprecated resultSelector no longer supported, pipe to map instead */
  23. export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, R>(sources: [O1, O2, O3, O4, O5], resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>, v3: ObservedValueOf<O3>, v4: ObservedValueOf<O4>, v5: ObservedValueOf<O5>) => R, scheduler?: SchedulerLike): Observable<R>;
  24. /** @deprecated resultSelector no longer supported, pipe to map instead */
  25. export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>, R>(sources: [O1, O2, O3, O4, O5, O6], resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>, v3: ObservedValueOf<O3>, v4: ObservedValueOf<O4>, v5: ObservedValueOf<O5>, v6: ObservedValueOf<O6>) => R, scheduler?: SchedulerLike): Observable<R>;
  26. /** @deprecated resultSelector no longer supported, pipe to map instead */
  27. export function combineLatest<O extends ObservableInput<any>, R>(sources: O[], resultSelector: (...args: ObservedValueOf<O>[]) => R, scheduler?: SchedulerLike): Observable<R>;
  28. // standard call, but with a result selector
  29. /** @deprecated resultSelector no longer supported, pipe to map instead */
  30. export function combineLatest<O1 extends ObservableInput<any>, R>(v1: O1, resultSelector: (v1: ObservedValueOf<O1>) => R, scheduler?: SchedulerLike): Observable<R>;
  31. /** @deprecated resultSelector no longer supported, pipe to map instead */
  32. export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, R>(v1: O1, v2: O2, resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>) => R, scheduler?: SchedulerLike): Observable<R>;
  33. /** @deprecated resultSelector no longer supported, pipe to map instead */
  34. export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, R>(v1: O1, v2: O2, v3: O3, resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>, v3: ObservedValueOf<O3>) => R, scheduler?: SchedulerLike): Observable<R>;
  35. /** @deprecated resultSelector no longer supported, pipe to map instead */
  36. export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, R>(v1: O1, v2: O2, v3: O3, v4: O4, resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>, v3: ObservedValueOf<O3>, v4: ObservedValueOf<O4>) => R, scheduler?: SchedulerLike): Observable<R>;
  37. /** @deprecated resultSelector no longer supported, pipe to map instead */
  38. export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, R>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>, v3: ObservedValueOf<O3>, v4: ObservedValueOf<O4>, v5: ObservedValueOf<O5>) => R, scheduler?: SchedulerLike): Observable<R>;
  39. /** @deprecated resultSelector no longer supported, pipe to map instead */
  40. export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>, R>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, v6: O6, resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>, v3: ObservedValueOf<O3>, v4: ObservedValueOf<O4>, v5: ObservedValueOf<O5>, v6: ObservedValueOf<O6>) => R, scheduler?: SchedulerLike): Observable<R>;
  41. // With a scheduler (deprecated)
  42. /** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
  43. export function combineLatest<O1 extends ObservableInput<any>>(sources: [O1], scheduler: SchedulerLike): Observable<[ObservedValueOf<O1>]>;
  44. /** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
  45. export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>>(sources: [O1, O2], scheduler: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>]>;
  46. /** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
  47. export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>>(sources: [O1, O2, O3], scheduler: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>]>;
  48. /** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
  49. export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>>(sources: [O1, O2, O3, O4], scheduler: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>]>;
  50. /** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
  51. export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>>(sources: [O1, O2, O3, O4, O5], scheduler: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>]>;
  52. /** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
  53. export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>>(sources: [O1, O2, O3, O4, O5, O6], scheduler: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>, ObservedValueOf<O6>]>;
  54. /** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
  55. export function combineLatest<O extends ObservableInput<any>>(sources: O[], scheduler: SchedulerLike): Observable<ObservedValueOf<O>[]>;
  56. // Best case
  57. export function combineLatest<O1 extends ObservableInput<any>>(sources: [O1]): Observable<[ObservedValueOf<O1>]>;
  58. export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>>(sources: [O1, O2]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>]>;
  59. export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>>(sources: [O1, O2, O3]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>]>;
  60. export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>>(sources: [O1, O2, O3, O4]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>]>;
  61. export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>>(sources: [O1, O2, O3, O4, O5]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>]>;
  62. export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>>(sources: [O1, O2, O3, O4, O5, O6]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>, ObservedValueOf<O6>]>;
  63. export function combineLatest<O extends ObservableInput<any>>(sources: O[]): Observable<ObservedValueOf<O>[]>;
  64. // Standard calls
  65. /** @deprecated Pass arguments in a single array instead `combineLatest([a, b, c])` */
  66. export function combineLatest<O1 extends ObservableInput<any>>(v1: O1, scheduler?: SchedulerLike): Observable<[ObservedValueOf<O1>]>;
  67. /** @deprecated Pass arguments in a single array instead `combineLatest([a, b, c])` */
  68. export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>>(v1: O1, v2: O2, scheduler?: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>]>;
  69. /** @deprecated Pass arguments in a single array instead `combineLatest([a, b, c])` */
  70. export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, scheduler?: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>]>;
  71. /** @deprecated Pass arguments in a single array instead `combineLatest([a, b, c])` */
  72. export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, scheduler?: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>]>;
  73. /** @deprecated Pass arguments in a single array instead `combineLatest([a, b, c])` */
  74. export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, scheduler?: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>]>;
  75. /** @deprecated Pass arguments in a single array instead `combineLatest([a, b, c])` */
  76. export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, v6: O6, scheduler?: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>, ObservedValueOf<O6>]>;
  77. /** @deprecated Pass arguments in a single array instead `combineLatest([a, b, c])` */
  78. export function combineLatest<O extends ObservableInput<any>>(...observables: O[]): Observable<any[]>;
  79. /** @deprecated Pass arguments in a single array instead `combineLatest([a, b, c])` */
  80. export function combineLatest<O extends ObservableInput<any>, R>(...observables: Array<ObservableInput<any> | ((...values: Array<any>) => R)>): Observable<R>;
  81. /** @deprecated resultSelector no longer supported, pipe to map instead */
  82. export function combineLatest<O extends ObservableInput<any>, R>(array: O[], resultSelector: (...values: ObservedValueOf<O>[]) => R, scheduler?: SchedulerLike): Observable<R>;
  83. /** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
  84. export function combineLatest<O extends ObservableInput<any>>(...observables: Array<O | SchedulerLike>): Observable<any[]>;
  85. /** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
  86. export function combineLatest<O extends ObservableInput<any>, R>(...observables: Array<O | ((...values: ObservedValueOf<O>[]) => R) | SchedulerLike>): Observable<R>;
  87. /** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
  88. export function combineLatest<R>(...observables: Array<ObservableInput<any> | ((...values: Array<any>) => R) | SchedulerLike>): Observable<R>;
  89. /* tslint:enable:max-line-length */
  90. /**
  91. * Combines multiple Observables to create an Observable whose values are
  92. * calculated from the latest values of each of its input Observables.
  93. *
  94. * <span class="informal">Whenever any input Observable emits a value, it
  95. * computes a formula using the latest values from all the inputs, then emits
  96. * the output of that formula.</span>
  97. *
  98. * ![](combineLatest.png)
  99. *
  100. * `combineLatest` combines the values from all the Observables passed as
  101. * arguments. This is done by subscribing to each Observable in order and,
  102. * whenever any Observable emits, collecting an array of the most recent
  103. * values from each Observable. So if you pass `n` Observables to operator,
  104. * returned Observable will always emit an array of `n` values, in order
  105. * corresponding to order of passed Observables (value from the first Observable
  106. * on the first place and so on).
  107. *
  108. * Static version of `combineLatest` accepts either an array of Observables
  109. * or each Observable can be put directly as an argument. Note that array of
  110. * Observables is good choice, if you don't know beforehand how many Observables
  111. * you will combine. Passing empty array will result in Observable that
  112. * completes immediately.
  113. *
  114. * To ensure output array has always the same length, `combineLatest` will
  115. * actually wait for all input Observables to emit at least once,
  116. * before it starts emitting results. This means if some Observable emits
  117. * values before other Observables started emitting, all these values but the last
  118. * will be lost. On the other hand, if some Observable does not emit a value but
  119. * completes, resulting Observable will complete at the same moment without
  120. * emitting anything, since it will be now impossible to include value from
  121. * completed Observable in resulting array. Also, if some input Observable does
  122. * not emit any value and never completes, `combineLatest` will also never emit
  123. * and never complete, since, again, it will wait for all streams to emit some
  124. * value.
  125. *
  126. * If at least one Observable was passed to `combineLatest` and all passed Observables
  127. * emitted something, resulting Observable will complete when all combined
  128. * streams complete. So even if some Observable completes, result of
  129. * `combineLatest` will still emit values when other Observables do. In case
  130. * of completed Observable, its value from now on will always be the last
  131. * emitted value. On the other hand, if any Observable errors, `combineLatest`
  132. * will error immediately as well, and all other Observables will be unsubscribed.
  133. *
  134. * `combineLatest` accepts as optional parameter `project` function, which takes
  135. * as arguments all values that would normally be emitted by resulting Observable.
  136. * `project` can return any kind of value, which will be then emitted by Observable
  137. * instead of default array. Note that `project` does not take as argument that array
  138. * of values, but values themselves. That means default `project` can be imagined
  139. * as function that takes all its arguments and puts them into an array.
  140. *
  141. * ## Examples
  142. * ### Combine two timer Observables
  143. * ```ts
  144. * import { combineLatest, timer } from 'rxjs';
  145. *
  146. * const firstTimer = timer(0, 1000); // emit 0, 1, 2... after every second, starting from now
  147. * const secondTimer = timer(500, 1000); // emit 0, 1, 2... after every second, starting 0,5s from now
  148. * const combinedTimers = combineLatest(firstTimer, secondTimer);
  149. * combinedTimers.subscribe(value => console.log(value));
  150. * // Logs
  151. * // [0, 0] after 0.5s
  152. * // [1, 0] after 1s
  153. * // [1, 1] after 1.5s
  154. * // [2, 1] after 2s
  155. * ```
  156. *
  157. * ### Combine an array of Observables
  158. * ```ts
  159. * import { combineLatest, of } from 'rxjs';
  160. * import { delay, starWith } from 'rxjs/operators';
  161. *
  162. * const observables = [1, 5, 10].map(
  163. * n => of(n).pipe(
  164. * delay(n * 1000), // emit 0 and then emit n after n seconds
  165. * startWith(0),
  166. * )
  167. * );
  168. * const combined = combineLatest(observables);
  169. * combined.subscribe(value => console.log(value));
  170. * // Logs
  171. * // [0, 0, 0] immediately
  172. * // [1, 0, 0] after 1s
  173. * // [1, 5, 0] after 5s
  174. * // [1, 5, 10] after 10s
  175. * ```
  176. *
  177. *
  178. * ### Use project function to dynamically calculate the Body-Mass Index
  179. * ```ts
  180. * import { combineLatest, of } from 'rxjs';
  181. * import { map } from 'rxjs/operators';
  182. *
  183. * const weight = of(70, 72, 76, 79, 75);
  184. * const height = of(1.76, 1.77, 1.78);
  185. * const bmi = combineLatest(weight, height).pipe(
  186. * map(([w, h]) => w / (h * h)),
  187. * );
  188. * bmi.subscribe(x => console.log('BMI is ' + x));
  189. *
  190. * // With output to console:
  191. * // BMI is 24.212293388429753
  192. * // BMI is 23.93948099205209
  193. * // BMI is 23.671253629592222
  194. * ```
  195. *
  196. * @see {@link combineAll}
  197. * @see {@link merge}
  198. * @see {@link withLatestFrom}
  199. *
  200. * @param {ObservableInput} observable1 An input Observable to combine with other Observables.
  201. * @param {ObservableInput} observable2 An input Observable to combine with other Observables.
  202. * More than one input Observables may be given as arguments
  203. * or an array of Observables may be given as the first argument.
  204. * @param {function} [project] An optional function to project the values from
  205. * the combined latest values into a new value on the output Observable.
  206. * @param {SchedulerLike} [scheduler=null] The {@link SchedulerLike} to use for subscribing to
  207. * each input Observable.
  208. * @return {Observable} An Observable of projected values from the most recent
  209. * values from each input Observable, or an array of the most recent values from
  210. * each input Observable.
  211. */
  212. export function combineLatest<O extends ObservableInput<any>, R>(
  213. ...observables: (O | ((...values: ObservedValueOf<O>[]) => R) | SchedulerLike)[]
  214. ): Observable<R> {
  215. let resultSelector: ((...values: Array<any>) => R) | undefined = undefined;
  216. let scheduler: SchedulerLike|undefined = undefined;
  217. if (isScheduler(observables[observables.length - 1])) {
  218. scheduler = observables.pop() as SchedulerLike;
  219. }
  220. if (typeof observables[observables.length - 1] === 'function') {
  221. resultSelector = observables.pop() as (...values: Array<any>) => R;
  222. }
  223. // if the first and only other argument besides the resultSelector is an array
  224. // assume it's been called with `combineLatest([obs1, obs2, obs3], resultSelector)`
  225. if (observables.length === 1 && isArray(observables[0])) {
  226. observables = observables[0] as any;
  227. }
  228. return fromArray(observables, scheduler).lift(new CombineLatestOperator(resultSelector));
  229. }
  230. export class CombineLatestOperator<T, R> implements Operator<T, R> {
  231. constructor(private resultSelector?: (...values: Array<any>) => R) {
  232. }
  233. call(subscriber: Subscriber<R>, source: any): any {
  234. return source.subscribe(new CombineLatestSubscriber(subscriber, this.resultSelector));
  235. }
  236. }
  237. /**
  238. * We need this JSDoc comment for affecting ESDoc.
  239. * @ignore
  240. * @extends {Ignored}
  241. */
  242. export class CombineLatestSubscriber<T, R> extends OuterSubscriber<T, R> {
  243. private active: number = 0;
  244. private values: any[] = [];
  245. private observables: any[] = [];
  246. private toRespond?: number;
  247. constructor(destination: Subscriber<R>, private resultSelector?: (...values: Array<any>) => R) {
  248. super(destination);
  249. }
  250. protected _next(observable: any) {
  251. this.values.push(NONE);
  252. this.observables.push(observable);
  253. }
  254. protected _complete() {
  255. const observables = this.observables;
  256. const len = observables.length;
  257. if (len === 0) {
  258. this.destination.complete!();
  259. } else {
  260. this.active = len;
  261. this.toRespond = len;
  262. for (let i = 0; i < len; i++) {
  263. const observable = observables[i];
  264. this.add(subscribeToResult(this, observable, undefined, i));
  265. }
  266. }
  267. }
  268. notifyComplete(unused: Subscriber<R>): void {
  269. if ((this.active -= 1) === 0) {
  270. this.destination.complete!();
  271. }
  272. }
  273. notifyNext(_outerValue: T, innerValue: R,
  274. outerIndex: number): void {
  275. const values = this.values;
  276. const oldVal = values[outerIndex];
  277. const toRespond = !this.toRespond
  278. ? 0
  279. : oldVal === NONE ? --this.toRespond : this.toRespond;
  280. values[outerIndex] = innerValue;
  281. if (toRespond === 0) {
  282. if (this.resultSelector) {
  283. this._tryResultSelector(values);
  284. } else {
  285. this.destination.next!(values.slice());
  286. }
  287. }
  288. }
  289. private _tryResultSelector(values: any[]) {
  290. let result: any;
  291. try {
  292. result = this.resultSelector!.apply(this, values);
  293. } catch (err) {
  294. this.destination.error!(err);
  295. return;
  296. }
  297. this.destination.next!(result);
  298. }
  299. }