Repositorio del curso CCOM4030 el semestre B91 del proyecto Artesanías con el Instituto de Cultura

window.ts 3.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. import { Observable } from '../Observable';
  2. import { OperatorFunction } from '../types';
  3. import { Subject } from '../Subject';
  4. import { Subscriber } from '../Subscriber';
  5. import { Operator } from '../Operator';
  6. import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
  7. /**
  8. * Branch out the source Observable values as a nested Observable whenever
  9. * `windowBoundaries` emits.
  10. *
  11. * <span class="informal">It's like {@link buffer}, but emits a nested Observable
  12. * instead of an array.</span>
  13. *
  14. * ![](window.png)
  15. *
  16. * Returns an Observable that emits windows of items it collects from the source
  17. * Observable. The output Observable emits connected, non-overlapping
  18. * windows. It emits the current window and opens a new one whenever the
  19. * Observable `windowBoundaries` emits an item. Because each window is an
  20. * Observable, the output is a higher-order Observable.
  21. *
  22. * ## Example
  23. * In every window of 1 second each, emit at most 2 click events
  24. * ```ts
  25. * import { fromEvent, interval } from 'rxjs';
  26. * import { window, mergeAll, map, take } from 'rxjs/operators';
  27. *
  28. * const clicks = fromEvent(document, 'click');
  29. * const sec = interval(1000);
  30. * const result = clicks.pipe(
  31. * window(sec),
  32. * map(win => win.pipe(take(2))), // each window has at most 2 emissions
  33. * mergeAll(), // flatten the Observable-of-Observables
  34. * );
  35. * result.subscribe(x => console.log(x));
  36. * ```
  37. * @see {@link windowCount}
  38. * @see {@link windowTime}
  39. * @see {@link windowToggle}
  40. * @see {@link windowWhen}
  41. * @see {@link buffer}
  42. *
  43. * @param {Observable<any>} windowBoundaries An Observable that completes the
  44. * previous window and starts a new window.
  45. * @return {Observable<Observable<T>>} An Observable of windows, which are
  46. * Observables emitting values of the source Observable.
  47. * @method window
  48. * @owner Observable
  49. */
  50. export function window<T>(windowBoundaries: Observable<any>): OperatorFunction<T, Observable<T>> {
  51. return function windowOperatorFunction(source: Observable<T>) {
  52. return source.lift(new WindowOperator(windowBoundaries));
  53. };
  54. }
  55. class WindowOperator<T> implements Operator<T, Observable<T>> {
  56. constructor(private windowBoundaries: Observable<any>) {
  57. }
  58. call(subscriber: Subscriber<Observable<T>>, source: any): any {
  59. const windowSubscriber = new WindowSubscriber(subscriber);
  60. const sourceSubscription = source.subscribe(windowSubscriber);
  61. if (!sourceSubscription.closed) {
  62. windowSubscriber.add(innerSubscribe(this.windowBoundaries, new SimpleInnerSubscriber(windowSubscriber)));
  63. }
  64. return sourceSubscription;
  65. }
  66. }
  67. /**
  68. * We need this JSDoc comment for affecting ESDoc.
  69. * @ignore
  70. * @extends {Ignored}
  71. */
  72. class WindowSubscriber<T> extends SimpleOuterSubscriber<T, any> {
  73. private window: Subject<T> = new Subject<T>();
  74. constructor(destination: Subscriber<Observable<T>>) {
  75. super(destination);
  76. destination.next(this.window);
  77. }
  78. notifyNext(): void {
  79. this.openWindow();
  80. }
  81. notifyError(error: any): void {
  82. this._error(error);
  83. }
  84. notifyComplete(): void {
  85. this._complete();
  86. }
  87. protected _next(value: T): void {
  88. this.window.next(value);
  89. }
  90. protected _error(err: any): void {
  91. this.window.error(err);
  92. this.destination.error!(err);
  93. }
  94. protected _complete(): void {
  95. this.window.complete();
  96. this.destination.complete!();
  97. }
  98. /** @deprecated This is an internal implementation detail, do not use. */
  99. _unsubscribe() {
  100. this.window = null!;
  101. }
  102. private openWindow(): void {
  103. const prevWindow = this.window;
  104. if (prevWindow) {
  105. prevWindow.complete();
  106. }
  107. const destination = this.destination;
  108. const newWindow = this.window = new Subject<T>();
  109. destination.next!(newWindow);
  110. }
  111. }