Repositorio del curso CCOM4030 el semestre B91 del proyecto Artesanías con el Instituto de Cultura

bufferTime.ts 8.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. import { Operator } from '../Operator';
  2. import { async } from '../scheduler/async';
  3. import { Observable } from '../Observable';
  4. import { Subscriber } from '../Subscriber';
  5. import { Subscription } from '../Subscription';
  6. import { isScheduler } from '../util/isScheduler';
  7. import { OperatorFunction, SchedulerAction, SchedulerLike } from '../types';
  8. /* tslint:disable:max-line-length */
  9. export function bufferTime<T>(bufferTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction<T, T[]>;
  10. export function bufferTime<T>(bufferTimeSpan: number, bufferCreationInterval: number | null | undefined, scheduler?: SchedulerLike): OperatorFunction<T, T[]>;
  11. export function bufferTime<T>(bufferTimeSpan: number, bufferCreationInterval: number | null | undefined, maxBufferSize: number, scheduler?: SchedulerLike): OperatorFunction<T, T[]>;
  12. /* tslint:enable:max-line-length */
  13. /**
  14. * Buffers the source Observable values for a specific time period.
  15. *
  16. * <span class="informal">Collects values from the past as an array, and emits
  17. * those arrays periodically in time.</span>
  18. *
  19. * ![](bufferTime.png)
  20. *
  21. * Buffers values from the source for a specific time duration `bufferTimeSpan`.
  22. * Unless the optional argument `bufferCreationInterval` is given, it emits and
  23. * resets the buffer every `bufferTimeSpan` milliseconds. If
  24. * `bufferCreationInterval` is given, this operator opens the buffer every
  25. * `bufferCreationInterval` milliseconds and closes (emits and resets) the
  26. * buffer every `bufferTimeSpan` milliseconds. When the optional argument
  27. * `maxBufferSize` is specified, the buffer will be closed either after
  28. * `bufferTimeSpan` milliseconds or when it contains `maxBufferSize` elements.
  29. *
  30. * ## Examples
  31. *
  32. * Every second, emit an array of the recent click events
  33. *
  34. * ```ts
  35. * import { fromEvent } from 'rxjs';
  36. * import { bufferTime } from 'rxjs/operators';
  37. *
  38. * const clicks = fromEvent(document, 'click');
  39. * const buffered = clicks.pipe(bufferTime(1000));
  40. * buffered.subscribe(x => console.log(x));
  41. * ```
  42. *
  43. * Every 5 seconds, emit the click events from the next 2 seconds
  44. *
  45. * ```ts
  46. * import { fromEvent } from 'rxjs';
  47. * import { bufferTime } from 'rxjs/operators';
  48. *
  49. * const clicks = fromEvent(document, 'click');
  50. * const buffered = clicks.pipe(bufferTime(2000, 5000));
  51. * buffered.subscribe(x => console.log(x));
  52. * ```
  53. *
  54. * @see {@link buffer}
  55. * @see {@link bufferCount}
  56. * @see {@link bufferToggle}
  57. * @see {@link bufferWhen}
  58. * @see {@link windowTime}
  59. *
  60. * @param {number} bufferTimeSpan The amount of time to fill each buffer array.
  61. * @param {number} [bufferCreationInterval] The interval at which to start new
  62. * buffers.
  63. * @param {number} [maxBufferSize] The maximum buffer size.
  64. * @param {SchedulerLike} [scheduler=async] The scheduler on which to schedule the
  65. * intervals that determine buffer boundaries.
  66. * @return {Observable<T[]>} An observable of arrays of buffered values.
  67. * @method bufferTime
  68. * @owner Observable
  69. */
  70. export function bufferTime<T>(bufferTimeSpan: number): OperatorFunction<T, T[]> {
  71. let length: number = arguments.length;
  72. let scheduler: SchedulerLike = async;
  73. if (isScheduler(arguments[arguments.length - 1])) {
  74. scheduler = arguments[arguments.length - 1];
  75. length--;
  76. }
  77. let bufferCreationInterval: number = null;
  78. if (length >= 2) {
  79. bufferCreationInterval = arguments[1];
  80. }
  81. let maxBufferSize: number = Number.POSITIVE_INFINITY;
  82. if (length >= 3) {
  83. maxBufferSize = arguments[2];
  84. }
  85. return function bufferTimeOperatorFunction(source: Observable<T>) {
  86. return source.lift(new BufferTimeOperator<T>(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler));
  87. };
  88. }
  89. class BufferTimeOperator<T> implements Operator<T, T[]> {
  90. constructor(private bufferTimeSpan: number,
  91. private bufferCreationInterval: number,
  92. private maxBufferSize: number,
  93. private scheduler: SchedulerLike) {
  94. }
  95. call(subscriber: Subscriber<T[]>, source: any): any {
  96. return source.subscribe(new BufferTimeSubscriber(
  97. subscriber, this.bufferTimeSpan, this.bufferCreationInterval, this.maxBufferSize, this.scheduler
  98. ));
  99. }
  100. }
  101. class Context<T> {
  102. buffer: T[] = [];
  103. closeAction: Subscription;
  104. }
  105. interface DispatchCreateArg<T> {
  106. bufferTimeSpan: number;
  107. bufferCreationInterval: number;
  108. subscriber: BufferTimeSubscriber<T>;
  109. scheduler: SchedulerLike;
  110. }
  111. interface DispatchCloseArg<T> {
  112. subscriber: BufferTimeSubscriber<T>;
  113. context: Context<T>;
  114. }
  115. /**
  116. * We need this JSDoc comment for affecting ESDoc.
  117. * @ignore
  118. * @extends {Ignored}
  119. */
  120. class BufferTimeSubscriber<T> extends Subscriber<T> {
  121. private contexts: Array<Context<T>> = [];
  122. private timespanOnly: boolean;
  123. constructor(destination: Subscriber<T[]>,
  124. private bufferTimeSpan: number,
  125. private bufferCreationInterval: number,
  126. private maxBufferSize: number,
  127. private scheduler: SchedulerLike) {
  128. super(destination);
  129. const context = this.openContext();
  130. this.timespanOnly = bufferCreationInterval == null || bufferCreationInterval < 0;
  131. if (this.timespanOnly) {
  132. const timeSpanOnlyState = { subscriber: this, context, bufferTimeSpan };
  133. this.add(context.closeAction = scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState));
  134. } else {
  135. const closeState = { subscriber: this, context };
  136. const creationState: DispatchCreateArg<T> = { bufferTimeSpan, bufferCreationInterval, subscriber: this, scheduler };
  137. this.add(context.closeAction = scheduler.schedule<DispatchCloseArg<T>>(dispatchBufferClose, bufferTimeSpan, closeState));
  138. this.add(scheduler.schedule<DispatchCreateArg<T>>(dispatchBufferCreation, bufferCreationInterval, creationState));
  139. }
  140. }
  141. protected _next(value: T) {
  142. const contexts = this.contexts;
  143. const len = contexts.length;
  144. let filledBufferContext: Context<T>;
  145. for (let i = 0; i < len; i++) {
  146. const context = contexts[i];
  147. const buffer = context.buffer;
  148. buffer.push(value);
  149. if (buffer.length == this.maxBufferSize) {
  150. filledBufferContext = context;
  151. }
  152. }
  153. if (filledBufferContext) {
  154. this.onBufferFull(filledBufferContext);
  155. }
  156. }
  157. protected _error(err: any) {
  158. this.contexts.length = 0;
  159. super._error(err);
  160. }
  161. protected _complete() {
  162. const { contexts, destination } = this;
  163. while (contexts.length > 0) {
  164. const context = contexts.shift();
  165. destination.next(context.buffer);
  166. }
  167. super._complete();
  168. }
  169. /** @deprecated This is an internal implementation detail, do not use. */
  170. _unsubscribe() {
  171. this.contexts = null;
  172. }
  173. protected onBufferFull(context: Context<T>) {
  174. this.closeContext(context);
  175. const closeAction = context.closeAction;
  176. closeAction.unsubscribe();
  177. this.remove(closeAction);
  178. if (!this.closed && this.timespanOnly) {
  179. context = this.openContext();
  180. const bufferTimeSpan = this.bufferTimeSpan;
  181. const timeSpanOnlyState = { subscriber: this, context, bufferTimeSpan };
  182. this.add(context.closeAction = this.scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState));
  183. }
  184. }
  185. openContext(): Context<T> {
  186. const context: Context<T> = new Context<T>();
  187. this.contexts.push(context);
  188. return context;
  189. }
  190. closeContext(context: Context<T>) {
  191. this.destination.next(context.buffer);
  192. const contexts = this.contexts;
  193. const spliceIndex = contexts ? contexts.indexOf(context) : -1;
  194. if (spliceIndex >= 0) {
  195. contexts.splice(contexts.indexOf(context), 1);
  196. }
  197. }
  198. }
  199. function dispatchBufferTimeSpanOnly(this: SchedulerAction<any>, state: any) {
  200. const subscriber: BufferTimeSubscriber<any> = state.subscriber;
  201. const prevContext = state.context;
  202. if (prevContext) {
  203. subscriber.closeContext(prevContext);
  204. }
  205. if (!subscriber.closed) {
  206. state.context = subscriber.openContext();
  207. state.context.closeAction = this.schedule(state, state.bufferTimeSpan);
  208. }
  209. }
  210. function dispatchBufferCreation<T>(this: SchedulerAction<DispatchCreateArg<T>>, state: DispatchCreateArg<T>) {
  211. const { bufferCreationInterval, bufferTimeSpan, subscriber, scheduler } = state;
  212. const context = subscriber.openContext();
  213. const action = <SchedulerAction<DispatchCreateArg<T>>>this;
  214. if (!subscriber.closed) {
  215. subscriber.add(context.closeAction = scheduler.schedule<DispatchCloseArg<T>>(dispatchBufferClose, bufferTimeSpan, { subscriber, context }));
  216. action.schedule(state, bufferCreationInterval);
  217. }
  218. }
  219. function dispatchBufferClose<T>(arg: DispatchCloseArg<T>) {
  220. const { subscriber, context } = arg;
  221. subscriber.closeContext(context);
  222. }