Repositorio del curso CCOM4030 el semestre B91 del proyecto Artesanías con el Instituto de Cultura

bufferToggle.ts 5.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. import { Operator } from '../Operator';
  2. import { Subscriber } from '../Subscriber';
  3. import { Observable } from '../Observable';
  4. import { Subscription } from '../Subscription';
  5. import { subscribeToResult } from '../util/subscribeToResult';
  6. import { OuterSubscriber } from '../OuterSubscriber';
  7. import { InnerSubscriber } from '../InnerSubscriber';
  8. import { OperatorFunction, SubscribableOrPromise } from '../types';
  9. /**
  10. * Buffers the source Observable values starting from an emission from
  11. * `openings` and ending when the output of `closingSelector` emits.
  12. *
  13. * <span class="informal">Collects values from the past as an array. Starts
  14. * collecting only when `opening` emits, and calls the `closingSelector`
  15. * function to get an Observable that tells when to close the buffer.</span>
  16. *
  17. * ![](bufferToggle.png)
  18. *
  19. * Buffers values from the source by opening the buffer via signals from an
  20. * Observable provided to `openings`, and closing and sending the buffers when
  21. * a Subscribable or Promise returned by the `closingSelector` function emits.
  22. *
  23. * ## Example
  24. *
  25. * Every other second, emit the click events from the next 500ms
  26. *
  27. * ```ts
  28. * import { fromEvent, interval, EMPTY } from 'rxjs';
  29. * import { bufferToggle } from 'rxjs/operators';
  30. *
  31. * const clicks = fromEvent(document, 'click');
  32. * const openings = interval(1000);
  33. * const buffered = clicks.pipe(bufferToggle(openings, i =>
  34. * i % 2 ? interval(500) : EMPTY
  35. * ));
  36. * buffered.subscribe(x => console.log(x));
  37. * ```
  38. *
  39. * @see {@link buffer}
  40. * @see {@link bufferCount}
  41. * @see {@link bufferTime}
  42. * @see {@link bufferWhen}
  43. * @see {@link windowToggle}
  44. *
  45. * @param {SubscribableOrPromise<O>} openings A Subscribable or Promise of notifications to start new
  46. * buffers.
  47. * @param {function(value: O): SubscribableOrPromise} closingSelector A function that takes
  48. * the value emitted by the `openings` observable and returns a Subscribable or Promise,
  49. * which, when it emits, signals that the associated buffer should be emitted
  50. * and cleared.
  51. * @return {Observable<T[]>} An observable of arrays of buffered values.
  52. * @method bufferToggle
  53. * @owner Observable
  54. */
  55. export function bufferToggle<T, O>(
  56. openings: SubscribableOrPromise<O>,
  57. closingSelector: (value: O) => SubscribableOrPromise<any>
  58. ): OperatorFunction<T, T[]> {
  59. return function bufferToggleOperatorFunction(source: Observable<T>) {
  60. return source.lift(new BufferToggleOperator<T, O>(openings, closingSelector));
  61. };
  62. }
  63. class BufferToggleOperator<T, O> implements Operator<T, T[]> {
  64. constructor(private openings: SubscribableOrPromise<O>,
  65. private closingSelector: (value: O) => SubscribableOrPromise<any>) {
  66. }
  67. call(subscriber: Subscriber<T[]>, source: any): any {
  68. return source.subscribe(new BufferToggleSubscriber(subscriber, this.openings, this.closingSelector));
  69. }
  70. }
  71. interface BufferContext<T> {
  72. buffer: T[];
  73. subscription: Subscription;
  74. }
  75. /**
  76. * We need this JSDoc comment for affecting ESDoc.
  77. * @ignore
  78. * @extends {Ignored}
  79. */
  80. class BufferToggleSubscriber<T, O> extends OuterSubscriber<T, O> {
  81. private contexts: Array<BufferContext<T>> = [];
  82. constructor(destination: Subscriber<T[]>,
  83. openings: SubscribableOrPromise<O>,
  84. private closingSelector: (value: O) => SubscribableOrPromise<any> | void) {
  85. super(destination);
  86. this.add(subscribeToResult(this, openings));
  87. }
  88. protected _next(value: T): void {
  89. const contexts = this.contexts;
  90. const len = contexts.length;
  91. for (let i = 0; i < len; i++) {
  92. contexts[i].buffer.push(value);
  93. }
  94. }
  95. protected _error(err: any): void {
  96. const contexts = this.contexts;
  97. while (contexts.length > 0) {
  98. const context = contexts.shift()!;
  99. context.subscription.unsubscribe();
  100. context.buffer = null!;
  101. context.subscription = null!;
  102. }
  103. this.contexts = null!;
  104. super._error(err);
  105. }
  106. protected _complete(): void {
  107. const contexts = this.contexts;
  108. while (contexts.length > 0) {
  109. const context = contexts.shift()!;
  110. this.destination.next!(context.buffer);
  111. context.subscription.unsubscribe();
  112. context.buffer = null!;
  113. context.subscription = null!;
  114. }
  115. this.contexts = null!;
  116. super._complete();
  117. }
  118. notifyNext(outerValue: any, innerValue: O): void {
  119. outerValue ? this.closeBuffer(outerValue) : this.openBuffer(innerValue);
  120. }
  121. notifyComplete(innerSub: InnerSubscriber<T, O>): void {
  122. this.closeBuffer((<any> innerSub).context);
  123. }
  124. private openBuffer(value: O): void {
  125. try {
  126. const closingSelector = this.closingSelector;
  127. const closingNotifier = closingSelector.call(this, value);
  128. if (closingNotifier) {
  129. this.trySubscribe(closingNotifier);
  130. }
  131. } catch (err) {
  132. this._error(err);
  133. }
  134. }
  135. private closeBuffer(context: BufferContext<T>): void {
  136. const contexts = this.contexts;
  137. if (contexts && context) {
  138. const { buffer, subscription } = context;
  139. this.destination.next!(buffer);
  140. contexts.splice(contexts.indexOf(context), 1);
  141. this.remove(subscription);
  142. subscription.unsubscribe();
  143. }
  144. }
  145. private trySubscribe(closingNotifier: any): void {
  146. const contexts = this.contexts;
  147. const buffer: Array<T> = [];
  148. const subscription = new Subscription();
  149. const context = { buffer, subscription };
  150. contexts.push(context);
  151. const innerSubscription = subscribeToResult(this, closingNotifier, context as any);
  152. if (!innerSubscription || innerSubscription.closed) {
  153. this.closeBuffer(context);
  154. } else {
  155. (innerSubscription as any).context = context;
  156. this.add(innerSubscription);
  157. subscription.add(innerSubscription);
  158. }
  159. }
  160. }