Repositorio del curso CCOM4030 el semestre B91 del proyecto Artesanías con el Instituto de Cultura

windowToggle.ts 6.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. import { Operator } from '../Operator';
  2. import { Subscriber } from '../Subscriber';
  3. import { Observable } from '../Observable';
  4. import { Subject } from '../Subject';
  5. import { Subscription } from '../Subscription';
  6. import { OuterSubscriber } from '../OuterSubscriber';
  7. import { InnerSubscriber } from '../InnerSubscriber';
  8. import { subscribeToResult } from '../util/subscribeToResult';
  9. import { OperatorFunction } from '../types';
  10. /**
  11. * Branch out the source Observable values as a nested Observable starting from
  12. * an emission from `openings` and ending when the output of `closingSelector`
  13. * emits.
  14. *
  15. * <span class="informal">It's like {@link bufferToggle}, but emits a nested
  16. * Observable instead of an array.</span>
  17. *
  18. * ![](windowToggle.png)
  19. *
  20. * Returns an Observable that emits windows of items it collects from the source
  21. * Observable. The output Observable emits windows that contain those items
  22. * emitted by the source Observable between the time when the `openings`
  23. * Observable emits an item and when the Observable returned by
  24. * `closingSelector` emits an item.
  25. *
  26. * ## Example
  27. * Every other second, emit the click events from the next 500ms
  28. * ```ts
  29. * import { fromEvent, interval, EMPTY } from 'rxjs';
  30. * import { windowToggle, mergeAll } from 'rxjs/operators';
  31. *
  32. * const clicks = fromEvent(document, 'click');
  33. * const openings = interval(1000);
  34. * const result = clicks.pipe(
  35. * windowToggle(openings, i => i % 2 ? interval(500) : EMPTY),
  36. * mergeAll()
  37. * );
  38. * result.subscribe(x => console.log(x));
  39. * ```
  40. *
  41. * @see {@link window}
  42. * @see {@link windowCount}
  43. * @see {@link windowTime}
  44. * @see {@link windowWhen}
  45. * @see {@link bufferToggle}
  46. *
  47. * @param {Observable<O>} openings An observable of notifications to start new
  48. * windows.
  49. * @param {function(value: O): Observable} closingSelector A function that takes
  50. * the value emitted by the `openings` observable and returns an Observable,
  51. * which, when it emits (either `next` or `complete`), signals that the
  52. * associated window should complete.
  53. * @return {Observable<Observable<T>>} An observable of windows, which in turn
  54. * are Observables.
  55. * @method windowToggle
  56. * @owner Observable
  57. */
  58. export function windowToggle<T, O>(openings: Observable<O>,
  59. closingSelector: (openValue: O) => Observable<any>): OperatorFunction<T, Observable<T>> {
  60. return (source: Observable<T>) => source.lift(new WindowToggleOperator<T, O>(openings, closingSelector));
  61. }
  62. class WindowToggleOperator<T, O> implements Operator<T, Observable<T>> {
  63. constructor(private openings: Observable<O>,
  64. private closingSelector: (openValue: O) => Observable<any>) {
  65. }
  66. call(subscriber: Subscriber<Observable<T>>, source: any): any {
  67. return source.subscribe(new WindowToggleSubscriber(
  68. subscriber, this.openings, this.closingSelector
  69. ));
  70. }
  71. }
  72. interface WindowContext<T> {
  73. window: Subject<T>;
  74. subscription: Subscription;
  75. }
  76. /**
  77. * We need this JSDoc comment for affecting ESDoc.
  78. * @ignore
  79. * @extends {Ignored}
  80. */
  81. class WindowToggleSubscriber<T, O> extends OuterSubscriber<T, any> {
  82. private contexts: WindowContext<T>[] = [];
  83. private openSubscription: Subscription;
  84. constructor(destination: Subscriber<Observable<T>>,
  85. private openings: Observable<O>,
  86. private closingSelector: (openValue: O) => Observable<any>) {
  87. super(destination);
  88. this.add(this.openSubscription = subscribeToResult(this, openings, openings as any));
  89. }
  90. protected _next(value: T) {
  91. const { contexts } = this;
  92. if (contexts) {
  93. const len = contexts.length;
  94. for (let i = 0; i < len; i++) {
  95. contexts[i].window.next(value);
  96. }
  97. }
  98. }
  99. protected _error(err: any) {
  100. const { contexts } = this;
  101. this.contexts = null;
  102. if (contexts) {
  103. const len = contexts.length;
  104. let index = -1;
  105. while (++index < len) {
  106. const context = contexts[index];
  107. context.window.error(err);
  108. context.subscription.unsubscribe();
  109. }
  110. }
  111. super._error(err);
  112. }
  113. protected _complete() {
  114. const { contexts } = this;
  115. this.contexts = null;
  116. if (contexts) {
  117. const len = contexts.length;
  118. let index = -1;
  119. while (++index < len) {
  120. const context = contexts[index];
  121. context.window.complete();
  122. context.subscription.unsubscribe();
  123. }
  124. }
  125. super._complete();
  126. }
  127. /** @deprecated This is an internal implementation detail, do not use. */
  128. _unsubscribe() {
  129. const { contexts } = this;
  130. this.contexts = null;
  131. if (contexts) {
  132. const len = contexts.length;
  133. let index = -1;
  134. while (++index < len) {
  135. const context = contexts[index];
  136. context.window.unsubscribe();
  137. context.subscription.unsubscribe();
  138. }
  139. }
  140. }
  141. notifyNext(outerValue: any, innerValue: any,
  142. outerIndex: number, innerIndex: number,
  143. innerSub: InnerSubscriber<T, any>): void {
  144. if (outerValue === this.openings) {
  145. let closingNotifier;
  146. try {
  147. const { closingSelector } = this;
  148. closingNotifier = closingSelector(innerValue);
  149. } catch (e) {
  150. return this.error(e);
  151. }
  152. const window = new Subject<T>();
  153. const subscription = new Subscription();
  154. const context = { window, subscription };
  155. this.contexts.push(context);
  156. const innerSubscription = subscribeToResult(this, closingNotifier, context as any);
  157. if (innerSubscription.closed) {
  158. this.closeWindow(this.contexts.length - 1);
  159. } else {
  160. (<any>innerSubscription).context = context;
  161. subscription.add(innerSubscription);
  162. }
  163. this.destination.next(window);
  164. } else {
  165. this.closeWindow(this.contexts.indexOf(outerValue));
  166. }
  167. }
  168. notifyError(err: any): void {
  169. this.error(err);
  170. }
  171. notifyComplete(inner: Subscription): void {
  172. if (inner !== this.openSubscription) {
  173. this.closeWindow(this.contexts.indexOf((<any> inner).context));
  174. }
  175. }
  176. private closeWindow(index: number): void {
  177. if (index === -1) {
  178. return;
  179. }
  180. const { contexts } = this;
  181. const context = contexts[index];
  182. const { window, subscription } = context;
  183. contexts.splice(index, 1);
  184. window.complete();
  185. subscription.unsubscribe();
  186. }
  187. }