Repositorio del curso CCOM4030 el semestre B91 del proyecto Artesanías con el Instituto de Cultura

exhaustMap.ts 5.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. import { Operator } from '../Operator';
  2. import { Observable } from '../Observable';
  3. import { Subscriber } from '../Subscriber';
  4. import { Subscription } from '../Subscription';
  5. import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types';
  6. import { map } from './map';
  7. import { from } from '../observable/from';
  8. import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe';
  9. /* tslint:disable:max-line-length */
  10. export function exhaustMap<T, O extends ObservableInput<any>>(project: (value: T, index: number) => O): OperatorFunction<T, ObservedValueOf<O>>;
  11. /** @deprecated resultSelector is no longer supported. Use inner map instead. */
  12. export function exhaustMap<T, O extends ObservableInput<any>>(project: (value: T, index: number) => O, resultSelector: undefined): OperatorFunction<T, ObservedValueOf<O>>;
  13. /** @deprecated resultSelector is no longer supported. Use inner map instead. */
  14. export function exhaustMap<T, I, R>(project: (value: T, index: number) => ObservableInput<I>, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): OperatorFunction<T, R>;
  15. /* tslint:enable:max-line-length */
  16. /**
  17. * Projects each source value to an Observable which is merged in the output
  18. * Observable only if the previous projected Observable has completed.
  19. *
  20. * <span class="informal">Maps each value to an Observable, then flattens all of
  21. * these inner Observables using {@link exhaust}.</span>
  22. *
  23. * ![](exhaustMap.png)
  24. *
  25. * Returns an Observable that emits items based on applying a function that you
  26. * supply to each item emitted by the source Observable, where that function
  27. * returns an (so-called "inner") Observable. When it projects a source value to
  28. * an Observable, the output Observable begins emitting the items emitted by
  29. * that projected Observable. However, `exhaustMap` ignores every new projected
  30. * Observable if the previous projected Observable has not yet completed. Once
  31. * that one completes, it will accept and flatten the next projected Observable
  32. * and repeat this process.
  33. *
  34. * ## Example
  35. * Run a finite timer for each click, only if there is no currently active timer
  36. * ```ts
  37. * import { fromEvent, interval } from 'rxjs';
  38. * import { exhaustMap, take } from 'rxjs/operators';
  39. *
  40. * const clicks = fromEvent(document, 'click');
  41. * const result = clicks.pipe(
  42. * exhaustMap(ev => interval(1000).pipe(take(5)))
  43. * );
  44. * result.subscribe(x => console.log(x));
  45. * ```
  46. *
  47. * @see {@link concatMap}
  48. * @see {@link exhaust}
  49. * @see {@link mergeMap}
  50. * @see {@link switchMap}
  51. *
  52. * @param {function(value: T, ?index: number): ObservableInput} project A function
  53. * that, when applied to an item emitted by the source Observable, returns an
  54. * Observable.
  55. * @return {Observable} An Observable containing projected Observables
  56. * of each item of the source, ignoring projected Observables that start before
  57. * their preceding Observable has completed.
  58. * @method exhaustMap
  59. * @owner Observable
  60. */
  61. export function exhaustMap<T, R, O extends ObservableInput<any>>(
  62. project: (value: T, index: number) => O,
  63. resultSelector?: (outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number) => R,
  64. ): OperatorFunction<T, ObservedValueOf<O>|R> {
  65. if (resultSelector) {
  66. // DEPRECATED PATH
  67. return (source: Observable<T>) => source.pipe(
  68. exhaustMap((a, i) => from(project(a, i)).pipe(
  69. map((b: any, ii: any) => resultSelector(a, b, i, ii)),
  70. )),
  71. );
  72. }
  73. return (source: Observable<T>) =>
  74. source.lift(new ExhaustMapOperator(project));
  75. }
  76. class ExhaustMapOperator<T, R> implements Operator<T, R> {
  77. constructor(private project: (value: T, index: number) => ObservableInput<R>) {
  78. }
  79. call(subscriber: Subscriber<R>, source: any): any {
  80. return source.subscribe(new ExhaustMapSubscriber(subscriber, this.project));
  81. }
  82. }
  83. /**
  84. * We need this JSDoc comment for affecting ESDoc.
  85. * @ignore
  86. * @extends {Ignored}
  87. */
  88. class ExhaustMapSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {
  89. private hasSubscription = false;
  90. private hasCompleted = false;
  91. private index = 0;
  92. constructor(destination: Subscriber<R>,
  93. private project: (value: T, index: number) => ObservableInput<R>) {
  94. super(destination);
  95. }
  96. protected _next(value: T): void {
  97. if (!this.hasSubscription) {
  98. this.tryNext(value);
  99. }
  100. }
  101. private tryNext(value: T): void {
  102. let result: ObservableInput<R>;
  103. const index = this.index++;
  104. try {
  105. result = this.project(value, index);
  106. } catch (err) {
  107. this.destination.error!(err);
  108. return;
  109. }
  110. this.hasSubscription = true;
  111. this._innerSub(result);
  112. }
  113. private _innerSub(result: ObservableInput<R>): void {
  114. const innerSubscriber = new SimpleInnerSubscriber(this);
  115. const destination = this.destination as Subscription;
  116. destination.add(innerSubscriber);
  117. const innerSubscription = innerSubscribe(result, innerSubscriber);
  118. // The returned subscription will usually be the subscriber that was
  119. // passed. However, interop subscribers will be wrapped and for
  120. // unsubscriptions to chain correctly, the wrapper needs to be added, too.
  121. if (innerSubscription !== innerSubscriber) {
  122. destination.add(innerSubscription);
  123. }
  124. }
  125. protected _complete(): void {
  126. this.hasCompleted = true;
  127. if (!this.hasSubscription) {
  128. this.destination.complete!();
  129. }
  130. this.unsubscribe();
  131. }
  132. notifyNext(innerValue: R): void {
  133. this.destination.next!(innerValue);
  134. }
  135. notifyError(err: any): void {
  136. this.destination.error!(err);
  137. }
  138. notifyComplete(): void {
  139. this.hasSubscription = false;
  140. if (this.hasCompleted) {
  141. this.destination.complete!();
  142. }
  143. }
  144. }