Repositorio del curso CCOM4030 el semestre B91 del proyecto Artesanías con el Instituto de Cultura

refCount.ts 5.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. import { Operator } from '../Operator';
  2. import { Subscriber } from '../Subscriber';
  3. import { Subscription } from '../Subscription';
  4. import { MonoTypeOperatorFunction, TeardownLogic } from '../types';
  5. import { ConnectableObservable } from '../observable/ConnectableObservable';
  6. import { Observable } from '../Observable';
  7. /**
  8. * Make a {@link ConnectableObservable} behave like a ordinary observable and automates the way
  9. * you can connect to it.
  10. *
  11. * Internally it counts the subscriptions to the observable and subscribes (only once) to the source if
  12. * the number of subscriptions is larger than 0. If the number of subscriptions is smaller than 1, it
  13. * unsubscribes from the source. This way you can make sure that everything before the *published*
  14. * refCount has only a single subscription independently of the number of subscribers to the target
  15. * observable.
  16. *
  17. * Note that using the {@link share} operator is exactly the same as using the *publish* operator
  18. * (making the observable hot) and the *refCount* operator in a sequence.
  19. *
  20. * ![](refCount.png)
  21. *
  22. * ## Example
  23. *
  24. * In the following example there are two intervals turned into connectable observables
  25. * by using the *publish* operator. The first one uses the *refCount* operator, the
  26. * second one does not use it. You will notice that a connectable observable does nothing
  27. * until you call its connect function.
  28. *
  29. * ```ts
  30. * import { interval } from 'rxjs';
  31. * import { tap, publish, refCount } from 'rxjs/operators';
  32. *
  33. * // Turn the interval observable into a ConnectableObservable (hot)
  34. * const refCountInterval = interval(400).pipe(
  35. * tap((num) => console.log(`refCount ${num}`)),
  36. * publish(),
  37. * refCount()
  38. * );
  39. *
  40. * const publishedInterval = interval(400).pipe(
  41. * tap((num) => console.log(`publish ${num}`)),
  42. * publish()
  43. * );
  44. *
  45. * refCountInterval.subscribe();
  46. * refCountInterval.subscribe();
  47. * // 'refCount 0' -----> 'refCount 1' -----> etc
  48. * // All subscriptions will receive the same value and the tap (and
  49. * // every other operator) before the publish operator will be executed
  50. * // only once per event independently of the number of subscriptions.
  51. *
  52. * publishedInterval.subscribe();
  53. * // Nothing happens until you call .connect() on the observable.
  54. * ```
  55. *
  56. * @see {@link ConnectableObservable}
  57. * @see {@link share}
  58. * @see {@link publish}
  59. */
  60. export function refCount<T>(): MonoTypeOperatorFunction<T> {
  61. return function refCountOperatorFunction(source: ConnectableObservable<T>): Observable<T> {
  62. return source.lift(new RefCountOperator(source));
  63. } as MonoTypeOperatorFunction<T>;
  64. }
  65. class RefCountOperator<T> implements Operator<T, T> {
  66. constructor(private connectable: ConnectableObservable<T>) {
  67. }
  68. call(subscriber: Subscriber<T>, source: any): TeardownLogic {
  69. const { connectable } = this;
  70. (<any> connectable)._refCount++;
  71. const refCounter = new RefCountSubscriber(subscriber, connectable);
  72. const subscription = source.subscribe(refCounter);
  73. if (!refCounter.closed) {
  74. (<any> refCounter).connection = connectable.connect();
  75. }
  76. return subscription;
  77. }
  78. }
  79. class RefCountSubscriber<T> extends Subscriber<T> {
  80. private connection: Subscription;
  81. constructor(destination: Subscriber<T>,
  82. private connectable: ConnectableObservable<T>) {
  83. super(destination);
  84. }
  85. protected _unsubscribe() {
  86. const { connectable } = this;
  87. if (!connectable) {
  88. this.connection = null;
  89. return;
  90. }
  91. this.connectable = null;
  92. const refCount = (<any> connectable)._refCount;
  93. if (refCount <= 0) {
  94. this.connection = null;
  95. return;
  96. }
  97. (<any> connectable)._refCount = refCount - 1;
  98. if (refCount > 1) {
  99. this.connection = null;
  100. return;
  101. }
  102. ///
  103. // Compare the local RefCountSubscriber's connection Subscription to the
  104. // connection Subscription on the shared ConnectableObservable. In cases
  105. // where the ConnectableObservable source synchronously emits values, and
  106. // the RefCountSubscriber's downstream Observers synchronously unsubscribe,
  107. // execution continues to here before the RefCountOperator has a chance to
  108. // supply the RefCountSubscriber with the shared connection Subscription.
  109. // For example:
  110. // ```
  111. // range(0, 10).pipe(
  112. // publish(),
  113. // refCount(),
  114. // take(5),
  115. // )
  116. // .subscribe();
  117. // ```
  118. // In order to account for this case, RefCountSubscriber should only dispose
  119. // the ConnectableObservable's shared connection Subscription if the
  120. // connection Subscription exists, *and* either:
  121. // a. RefCountSubscriber doesn't have a reference to the shared connection
  122. // Subscription yet, or,
  123. // b. RefCountSubscriber's connection Subscription reference is identical
  124. // to the shared connection Subscription
  125. ///
  126. const { connection } = this;
  127. const sharedConnection = (<any> connectable)._connection;
  128. this.connection = null;
  129. if (sharedConnection && (!connection || sharedConnection === connection)) {
  130. sharedConnection.unsubscribe();
  131. }
  132. }
  133. }