Repositorio del curso CCOM4030 el semestre B91 del proyecto Artesanías con el Instituto de Cultura

timeoutWith.ts 6.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. import { Operator } from '../Operator';
  2. import { Subscriber } from '../Subscriber';
  3. import { async } from '../scheduler/async';
  4. import { Observable } from '../Observable';
  5. import { isDate } from '../util/isDate';
  6. import { ObservableInput, OperatorFunction, SchedulerAction, SchedulerLike, TeardownLogic } from '../types';
  7. import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
  8. /* tslint:disable:max-line-length */
  9. export function timeoutWith<T, R>(due: number | Date, withObservable: ObservableInput<R>, scheduler?: SchedulerLike): OperatorFunction<T, T | R>;
  10. /* tslint:enable:max-line-length */
  11. /**
  12. *
  13. * Errors if Observable does not emit a value in given time span, in case of which
  14. * subscribes to the second Observable.
  15. *
  16. * <span class="informal">It's a version of `timeout` operator that let's you specify fallback Observable.</span>
  17. *
  18. * ![](timeoutWith.png)
  19. *
  20. * `timeoutWith` is a variation of `timeout` operator. It behaves exactly the same,
  21. * still accepting as a first argument either a number or a Date, which control - respectively -
  22. * when values of source Observable should be emitted or when it should complete.
  23. *
  24. * The only difference is that it accepts a second, required parameter. This parameter
  25. * should be an Observable which will be subscribed when source Observable fails any timeout check.
  26. * So whenever regular `timeout` would emit an error, `timeoutWith` will instead start re-emitting
  27. * values from second Observable. Note that this fallback Observable is not checked for timeouts
  28. * itself, so it can emit values and complete at arbitrary points in time. From the moment of a second
  29. * subscription, Observable returned from `timeoutWith` simply mirrors fallback stream. When that
  30. * stream completes, it completes as well.
  31. *
  32. * Scheduler, which in case of `timeout` is provided as as second argument, can be still provided
  33. * here - as a third, optional parameter. It still is used to schedule timeout checks and -
  34. * as a consequence - when second Observable will be subscribed, since subscription happens
  35. * immediately after failing check.
  36. *
  37. * ## Example
  38. * Add fallback observable
  39. * ```ts
  40. * import { interval } from 'rxjs';
  41. * import { timeoutWith } from 'rxjs/operators';
  42. *
  43. * const seconds = interval(1000);
  44. * const minutes = interval(60 * 1000);
  45. *
  46. * seconds.pipe(timeoutWith(900, minutes))
  47. * .subscribe(
  48. * value => console.log(value), // After 900ms, will start emitting `minutes`,
  49. * // since first value of `seconds` will not arrive fast enough.
  50. * err => console.log(err), // Would be called after 900ms in case of `timeout`,
  51. * // but here will never be called.
  52. * );
  53. * ```
  54. *
  55. * @param {number|Date} due Number specifying period within which Observable must emit values
  56. * or Date specifying before when Observable should complete
  57. * @param {Observable<T>} withObservable Observable which will be subscribed if source fails timeout check.
  58. * @param {SchedulerLike} [scheduler] Scheduler controlling when timeout checks occur.
  59. * @return {Observable<T>} Observable that mirrors behaviour of source or, when timeout check fails, of an Observable
  60. * passed as a second parameter.
  61. * @method timeoutWith
  62. * @owner Observable
  63. */
  64. export function timeoutWith<T, R>(due: number | Date,
  65. withObservable: ObservableInput<R>,
  66. scheduler: SchedulerLike = async): OperatorFunction<T, T | R> {
  67. return (source: Observable<T>) => {
  68. let absoluteTimeout = isDate(due);
  69. let waitFor = absoluteTimeout ? (+due - scheduler.now()) : Math.abs(<number>due);
  70. return source.lift(new TimeoutWithOperator(waitFor, absoluteTimeout, withObservable, scheduler));
  71. };
  72. }
  73. class TimeoutWithOperator<T> implements Operator<T, T> {
  74. constructor(private waitFor: number,
  75. private absoluteTimeout: boolean,
  76. private withObservable: ObservableInput<any>,
  77. private scheduler: SchedulerLike) {
  78. }
  79. call(subscriber: Subscriber<T>, source: any): TeardownLogic {
  80. return source.subscribe(new TimeoutWithSubscriber(
  81. subscriber, this.absoluteTimeout, this.waitFor, this.withObservable, this.scheduler
  82. ));
  83. }
  84. }
  85. /**
  86. * We need this JSDoc comment for affecting ESDoc.
  87. * @ignore
  88. * @extends {Ignored}
  89. */
  90. class TimeoutWithSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {
  91. private action?: SchedulerAction<TimeoutWithSubscriber<T, R>>;
  92. constructor(destination: Subscriber<T>,
  93. private absoluteTimeout: boolean,
  94. private waitFor: number,
  95. private withObservable: ObservableInput<any>,
  96. private scheduler: SchedulerLike) {
  97. super(destination);
  98. this.scheduleTimeout();
  99. }
  100. private static dispatchTimeout<T, R>(subscriber: TimeoutWithSubscriber<T, R>): void {
  101. const { withObservable } = subscriber;
  102. subscriber._unsubscribeAndRecycle();
  103. subscriber.add(innerSubscribe(withObservable, new SimpleInnerSubscriber(subscriber)));
  104. }
  105. private scheduleTimeout(): void {
  106. const { action } = this;
  107. if (action) {
  108. // Recycle the action if we've already scheduled one. All the production
  109. // Scheduler Actions mutate their state/delay time and return themeselves.
  110. // VirtualActions are immutable, so they create and return a clone. In this
  111. // case, we need to set the action reference to the most recent VirtualAction,
  112. // to ensure that's the one we clone from next time.
  113. this.action = (<SchedulerAction<TimeoutWithSubscriber<T, R>>> action.schedule(this, this.waitFor));
  114. } else {
  115. this.add(this.action = (<SchedulerAction<TimeoutWithSubscriber<T, R>>> this.scheduler.schedule<TimeoutWithSubscriber<T, R>>(
  116. TimeoutWithSubscriber.dispatchTimeout as any, this.waitFor, this
  117. )));
  118. }
  119. }
  120. protected _next(value: T): void {
  121. if (!this.absoluteTimeout) {
  122. this.scheduleTimeout();
  123. }
  124. super._next(value);
  125. }
  126. /** @deprecated This is an internal implementation detail, do not use. */
  127. _unsubscribe() {
  128. this.action = undefined;
  129. this.scheduler = null!;
  130. this.withObservable = null!;
  131. }
  132. }