Repositorio del curso CCOM4030 el semestre B91 del proyecto Artesanías con el Instituto de Cultura

buffer.ts 2.5KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. import { Operator } from '../Operator';
  2. import { Subscriber } from '../Subscriber';
  3. import { Observable } from '../Observable';
  4. import { OperatorFunction } from '../types';
  5. import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
  6. /**
  7. * Buffers the source Observable values until `closingNotifier` emits.
  8. *
  9. * <span class="informal">Collects values from the past as an array, and emits
  10. * that array only when another Observable emits.</span>
  11. *
  12. * ![](buffer.png)
  13. *
  14. * Buffers the incoming Observable values until the given `closingNotifier`
  15. * Observable emits a value, at which point it emits the buffer on the output
  16. * Observable and starts a new buffer internally, awaiting the next time
  17. * `closingNotifier` emits.
  18. *
  19. * ## Example
  20. *
  21. * On every click, emit array of most recent interval events
  22. *
  23. * ```ts
  24. * import { fromEvent, interval } from 'rxjs';
  25. * import { buffer } from 'rxjs/operators';
  26. *
  27. * const clicks = fromEvent(document, 'click');
  28. * const intervalEvents = interval(1000);
  29. * const buffered = intervalEvents.pipe(buffer(clicks));
  30. * buffered.subscribe(x => console.log(x));
  31. * ```
  32. *
  33. * @see {@link bufferCount}
  34. * @see {@link bufferTime}
  35. * @see {@link bufferToggle}
  36. * @see {@link bufferWhen}
  37. * @see {@link window}
  38. *
  39. * @param {Observable<any>} closingNotifier An Observable that signals the
  40. * buffer to be emitted on the output Observable.
  41. * @return {Observable<T[]>} An Observable of buffers, which are arrays of
  42. * values.
  43. * @method buffer
  44. * @owner Observable
  45. */
  46. export function buffer<T>(closingNotifier: Observable<any>): OperatorFunction<T, T[]> {
  47. return function bufferOperatorFunction(source: Observable<T>) {
  48. return source.lift(new BufferOperator<T>(closingNotifier));
  49. };
  50. }
  51. class BufferOperator<T> implements Operator<T, T[]> {
  52. constructor(private closingNotifier: Observable<any>) {
  53. }
  54. call(subscriber: Subscriber<T[]>, source: any): any {
  55. return source.subscribe(new BufferSubscriber(subscriber, this.closingNotifier));
  56. }
  57. }
  58. /**
  59. * We need this JSDoc comment for affecting ESDoc.
  60. * @ignore
  61. * @extends {Ignored}
  62. */
  63. class BufferSubscriber<T> extends SimpleOuterSubscriber<T, any> {
  64. private buffer: T[] = [];
  65. constructor(destination: Subscriber<T[]>, closingNotifier: Observable<any>) {
  66. super(destination);
  67. this.add(innerSubscribe(closingNotifier, new SimpleInnerSubscriber(this)));
  68. }
  69. protected _next(value: T) {
  70. this.buffer.push(value);
  71. }
  72. notifyNext(): void {
  73. const buffer = this.buffer;
  74. this.buffer = [];
  75. this.destination.next!(buffer);
  76. }
  77. }