Repositorio del curso CCOM4030 el semestre B91 del proyecto Artesanías con el Instituto de Cultura

expand.js 3.0KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
  2. export function expand(project, concurrent = Number.POSITIVE_INFINITY, scheduler) {
  3. concurrent = (concurrent || 0) < 1 ? Number.POSITIVE_INFINITY : concurrent;
  4. return (source) => source.lift(new ExpandOperator(project, concurrent, scheduler));
  5. }
  6. export class ExpandOperator {
  7. constructor(project, concurrent, scheduler) {
  8. this.project = project;
  9. this.concurrent = concurrent;
  10. this.scheduler = scheduler;
  11. }
  12. call(subscriber, source) {
  13. return source.subscribe(new ExpandSubscriber(subscriber, this.project, this.concurrent, this.scheduler));
  14. }
  15. }
  16. export class ExpandSubscriber extends SimpleOuterSubscriber {
  17. constructor(destination, project, concurrent, scheduler) {
  18. super(destination);
  19. this.project = project;
  20. this.concurrent = concurrent;
  21. this.scheduler = scheduler;
  22. this.index = 0;
  23. this.active = 0;
  24. this.hasCompleted = false;
  25. if (concurrent < Number.POSITIVE_INFINITY) {
  26. this.buffer = [];
  27. }
  28. }
  29. static dispatch(arg) {
  30. const { subscriber, result, value, index } = arg;
  31. subscriber.subscribeToProjection(result, value, index);
  32. }
  33. _next(value) {
  34. const destination = this.destination;
  35. if (destination.closed) {
  36. this._complete();
  37. return;
  38. }
  39. const index = this.index++;
  40. if (this.active < this.concurrent) {
  41. destination.next(value);
  42. try {
  43. const { project } = this;
  44. const result = project(value, index);
  45. if (!this.scheduler) {
  46. this.subscribeToProjection(result, value, index);
  47. }
  48. else {
  49. const state = { subscriber: this, result, value, index };
  50. const destination = this.destination;
  51. destination.add(this.scheduler.schedule(ExpandSubscriber.dispatch, 0, state));
  52. }
  53. }
  54. catch (e) {
  55. destination.error(e);
  56. }
  57. }
  58. else {
  59. this.buffer.push(value);
  60. }
  61. }
  62. subscribeToProjection(result, value, index) {
  63. this.active++;
  64. const destination = this.destination;
  65. destination.add(innerSubscribe(result, new SimpleInnerSubscriber(this)));
  66. }
  67. _complete() {
  68. this.hasCompleted = true;
  69. if (this.hasCompleted && this.active === 0) {
  70. this.destination.complete();
  71. }
  72. this.unsubscribe();
  73. }
  74. notifyNext(innerValue) {
  75. this._next(innerValue);
  76. }
  77. notifyComplete() {
  78. const buffer = this.buffer;
  79. this.active--;
  80. if (buffer && buffer.length > 0) {
  81. this._next(buffer.shift());
  82. }
  83. if (this.hasCompleted && this.active === 0) {
  84. this.destination.complete();
  85. }
  86. }
  87. }
  88. //# sourceMappingURL=expand.js.map