Repositorio del curso CCOM4030 el semestre B91 del proyecto Artesanías con el Instituto de Cultura

windowCount.ts 4.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. import { Operator } from '../Operator';
  2. import { Subscriber } from '../Subscriber';
  3. import { Observable } from '../Observable';
  4. import { Subject } from '../Subject';
  5. import { OperatorFunction } from '../types';
  6. /**
  7. * Branch out the source Observable values as a nested Observable with each
  8. * nested Observable emitting at most `windowSize` values.
  9. *
  10. * <span class="informal">It's like {@link bufferCount}, but emits a nested
  11. * Observable instead of an array.</span>
  12. *
  13. * ![](windowCount.png)
  14. *
  15. * Returns an Observable that emits windows of items it collects from the source
  16. * Observable. The output Observable emits windows every `startWindowEvery`
  17. * items, each containing no more than `windowSize` items. When the source
  18. * Observable completes or encounters an error, the output Observable emits
  19. * the current window and propagates the notification from the source
  20. * Observable. If `startWindowEvery` is not provided, then new windows are
  21. * started immediately at the start of the source and when each window completes
  22. * with size `windowSize`.
  23. *
  24. * ## Examples
  25. * Ignore every 3rd click event, starting from the first one
  26. * ```ts
  27. * import { fromEvent } from 'rxjs';
  28. * import { windowCount, map, mergeAll, skip } from 'rxjs/operators';
  29. *
  30. * const clicks = fromEvent(document, 'click');
  31. * const result = clicks.pipe(
  32. * windowCount(3),
  33. * map(win => win.pipe(skip(1))), // skip first of every 3 clicks
  34. * mergeAll() // flatten the Observable-of-Observables
  35. * );
  36. * result.subscribe(x => console.log(x));
  37. * ```
  38. *
  39. * Ignore every 3rd click event, starting from the third one
  40. * ```ts
  41. * import { fromEvent } from 'rxjs';
  42. * import { windowCount, mergeAll } from 'rxjs/operators';
  43. *
  44. * const clicks = fromEvent(document, 'click');
  45. * const result = clicks.pipe(
  46. * windowCount(2, 3),
  47. * mergeAll(), // flatten the Observable-of-Observables
  48. * );
  49. * result.subscribe(x => console.log(x));
  50. * ```
  51. *
  52. * @see {@link window}
  53. * @see {@link windowTime}
  54. * @see {@link windowToggle}
  55. * @see {@link windowWhen}
  56. * @see {@link bufferCount}
  57. *
  58. * @param {number} windowSize The maximum number of values emitted by each
  59. * window.
  60. * @param {number} [startWindowEvery] Interval at which to start a new window.
  61. * For example if `startWindowEvery` is `2`, then a new window will be started
  62. * on every other value from the source. A new window is started at the
  63. * beginning of the source by default.
  64. * @return {Observable<Observable<T>>} An Observable of windows, which in turn
  65. * are Observable of values.
  66. * @method windowCount
  67. * @owner Observable
  68. */
  69. export function windowCount<T>(windowSize: number,
  70. startWindowEvery: number = 0): OperatorFunction<T, Observable<T>> {
  71. return function windowCountOperatorFunction(source: Observable<T>) {
  72. return source.lift(new WindowCountOperator<T>(windowSize, startWindowEvery));
  73. };
  74. }
  75. class WindowCountOperator<T> implements Operator<T, Observable<T>> {
  76. constructor(private windowSize: number,
  77. private startWindowEvery: number) {
  78. }
  79. call(subscriber: Subscriber<Observable<T>>, source: any): any {
  80. return source.subscribe(new WindowCountSubscriber(subscriber, this.windowSize, this.startWindowEvery));
  81. }
  82. }
  83. /**
  84. * We need this JSDoc comment for affecting ESDoc.
  85. * @ignore
  86. * @extends {Ignored}
  87. */
  88. class WindowCountSubscriber<T> extends Subscriber<T> {
  89. private windows: Subject<T>[] = [ new Subject<T>() ];
  90. private count: number = 0;
  91. constructor(protected destination: Subscriber<Observable<T>>,
  92. private windowSize: number,
  93. private startWindowEvery: number) {
  94. super(destination);
  95. destination.next(this.windows[0]);
  96. }
  97. protected _next(value: T) {
  98. const startWindowEvery = (this.startWindowEvery > 0) ? this.startWindowEvery : this.windowSize;
  99. const destination = this.destination;
  100. const windowSize = this.windowSize;
  101. const windows = this.windows;
  102. const len = windows.length;
  103. for (let i = 0; i < len && !this.closed; i++) {
  104. windows[i].next(value);
  105. }
  106. const c = this.count - windowSize + 1;
  107. if (c >= 0 && c % startWindowEvery === 0 && !this.closed) {
  108. windows.shift().complete();
  109. }
  110. if (++this.count % startWindowEvery === 0 && !this.closed) {
  111. const window = new Subject<T>();
  112. windows.push(window);
  113. destination.next(window);
  114. }
  115. }
  116. protected _error(err: any) {
  117. const windows = this.windows;
  118. if (windows) {
  119. while (windows.length > 0 && !this.closed) {
  120. windows.shift().error(err);
  121. }
  122. }
  123. this.destination.error(err);
  124. }
  125. protected _complete() {
  126. const windows = this.windows;
  127. if (windows) {
  128. while (windows.length > 0 && !this.closed) {
  129. windows.shift().complete();
  130. }
  131. }
  132. this.destination.complete();
  133. }
  134. protected _unsubscribe() {
  135. this.count = 0;
  136. this.windows = null;
  137. }
  138. }