Repositorio del curso CCOM4030 el semestre B91 del proyecto Artesanías con el Instituto de Cultura

mergeScan.ts 4.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. import { Operator } from '../Operator';
  2. import { Observable } from '../Observable';
  3. import { Subscriber } from '../Subscriber';
  4. import { Subscription } from '../Subscription';
  5. import { ObservableInput, OperatorFunction } from '../types';
  6. import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe';
  7. /**
  8. * Applies an accumulator function over the source Observable where the
  9. * accumulator function itself returns an Observable, then each intermediate
  10. * Observable returned is merged into the output Observable.
  11. *
  12. * <span class="informal">It's like {@link scan}, but the Observables returned
  13. * by the accumulator are merged into the outer Observable.</span>
  14. *
  15. * ## Example
  16. * Count the number of click events
  17. * ```ts
  18. * import { fromEvent, of } from 'rxjs';
  19. * import { mapTo, mergeScan } from 'rxjs/operators';
  20. *
  21. * const click$ = fromEvent(document, 'click');
  22. * const one$ = click$.pipe(mapTo(1));
  23. * const seed = 0;
  24. * const count$ = one$.pipe(
  25. * mergeScan((acc, one) => of(acc + one), seed),
  26. * );
  27. * count$.subscribe(x => console.log(x));
  28. *
  29. * // Results:
  30. * // 1
  31. * // 2
  32. * // 3
  33. * // 4
  34. * // ...and so on for each click
  35. * ```
  36. *
  37. * @param {function(acc: R, value: T): Observable<R>} accumulator
  38. * The accumulator function called on each source value.
  39. * @param seed The initial accumulation value.
  40. * @param {number} [concurrent=Number.POSITIVE_INFINITY] Maximum number of
  41. * input Observables being subscribed to concurrently.
  42. * @return {Observable<R>} An observable of the accumulated values.
  43. * @method mergeScan
  44. * @owner Observable
  45. */
  46. export function mergeScan<T, R>(accumulator: (acc: R, value: T, index: number) => ObservableInput<R>,
  47. seed: R,
  48. concurrent: number = Number.POSITIVE_INFINITY): OperatorFunction<T, R> {
  49. return (source: Observable<T>) => source.lift(new MergeScanOperator(accumulator, seed, concurrent));
  50. }
  51. export class MergeScanOperator<T, R> implements Operator<T, R> {
  52. constructor(private accumulator: (acc: R, value: T, index: number) => ObservableInput<R>,
  53. private seed: R,
  54. private concurrent: number) {
  55. }
  56. call(subscriber: Subscriber<R>, source: any): any {
  57. return source.subscribe(new MergeScanSubscriber(
  58. subscriber, this.accumulator, this.seed, this.concurrent
  59. ));
  60. }
  61. }
  62. /**
  63. * We need this JSDoc comment for affecting ESDoc.
  64. * @ignore
  65. * @extends {Ignored}
  66. */
  67. export class MergeScanSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {
  68. private hasValue: boolean = false;
  69. private hasCompleted: boolean = false;
  70. private buffer: Observable<any>[] = [];
  71. private active: number = 0;
  72. protected index: number = 0;
  73. constructor(destination: Subscriber<R>,
  74. private accumulator: (acc: R, value: T, index: number) => ObservableInput<R>,
  75. private acc: R,
  76. private concurrent: number) {
  77. super(destination);
  78. }
  79. protected _next(value: any): void {
  80. if (this.active < this.concurrent) {
  81. const index = this.index++;
  82. const destination = this.destination;
  83. let ish;
  84. try {
  85. const { accumulator } = this;
  86. ish = accumulator(this.acc, value, index);
  87. } catch (e) {
  88. return destination.error!(e);
  89. }
  90. this.active++;
  91. this._innerSub(ish);
  92. } else {
  93. this.buffer.push(value);
  94. }
  95. }
  96. private _innerSub(ish: any): void {
  97. const innerSubscriber = new SimpleInnerSubscriber(this);
  98. const destination = this.destination as Subscription;
  99. destination.add(innerSubscriber);
  100. const innerSubscription = innerSubscribe(ish, innerSubscriber);
  101. // The returned subscription will usually be the subscriber that was
  102. // passed. However, interop subscribers will be wrapped and for
  103. // unsubscriptions to chain correctly, the wrapper needs to be added, too.
  104. if (innerSubscription !== innerSubscriber) {
  105. destination.add(innerSubscription);
  106. }
  107. }
  108. protected _complete(): void {
  109. this.hasCompleted = true;
  110. if (this.active === 0 && this.buffer.length === 0) {
  111. if (this.hasValue === false) {
  112. this.destination.next!(this.acc);
  113. }
  114. this.destination.complete!();
  115. }
  116. this.unsubscribe();
  117. }
  118. notifyNext(innerValue: R): void {
  119. const { destination } = this;
  120. this.acc = innerValue;
  121. this.hasValue = true;
  122. destination.next!(innerValue);
  123. }
  124. notifyComplete(): void {
  125. const buffer = this.buffer;
  126. this.active--;
  127. if (buffer.length > 0) {
  128. this._next(buffer.shift());
  129. } else if (this.active === 0 && this.hasCompleted) {
  130. if (this.hasValue === false) {
  131. this.destination.next!(this.acc);
  132. }
  133. this.destination.complete!();
  134. }
  135. }
  136. }