No Description

EachPromise.php 7.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. <?php
  2. namespace GuzzleHttp\Promise;
  3. /**
  4. * Represents a promise that iterates over many promises and invokes
  5. * side-effect functions in the process.
  6. */
  7. class EachPromise implements PromisorInterface
  8. {
  9. private $pending = [];
  10. /** @var \Iterator */
  11. private $iterable;
  12. /** @var callable|int */
  13. private $concurrency;
  14. /** @var callable */
  15. private $onFulfilled;
  16. /** @var callable */
  17. private $onRejected;
  18. /** @var Promise */
  19. private $aggregate;
  20. /** @var bool */
  21. private $mutex;
  22. /**
  23. * Configuration hash can include the following key value pairs:
  24. *
  25. * - fulfilled: (callable) Invoked when a promise fulfills. The function
  26. * is invoked with three arguments: the fulfillment value, the index
  27. * position from the iterable list of the promise, and the aggregate
  28. * promise that manages all of the promises. The aggregate promise may
  29. * be resolved from within the callback to short-circuit the promise.
  30. * - rejected: (callable) Invoked when a promise is rejected. The
  31. * function is invoked with three arguments: the rejection reason, the
  32. * index position from the iterable list of the promise, and the
  33. * aggregate promise that manages all of the promises. The aggregate
  34. * promise may be resolved from within the callback to short-circuit
  35. * the promise.
  36. * - concurrency: (integer) Pass this configuration option to limit the
  37. * allowed number of outstanding concurrently executing promises,
  38. * creating a capped pool of promises. There is no limit by default.
  39. *
  40. * @param mixed $iterable Promises or values to iterate.
  41. * @param array $config Configuration options
  42. */
  43. public function __construct($iterable, array $config = [])
  44. {
  45. $this->iterable = iter_for($iterable);
  46. if (isset($config['concurrency'])) {
  47. $this->concurrency = $config['concurrency'];
  48. }
  49. if (isset($config['fulfilled'])) {
  50. $this->onFulfilled = $config['fulfilled'];
  51. }
  52. if (isset($config['rejected'])) {
  53. $this->onRejected = $config['rejected'];
  54. }
  55. }
  56. public function promise()
  57. {
  58. if ($this->aggregate) {
  59. return $this->aggregate;
  60. }
  61. try {
  62. $this->createPromise();
  63. $this->iterable->rewind();
  64. $this->refillPending();
  65. } catch (\Throwable $e) {
  66. $this->aggregate->reject($e);
  67. } catch (\Exception $e) {
  68. $this->aggregate->reject($e);
  69. }
  70. return $this->aggregate;
  71. }
  72. private function createPromise()
  73. {
  74. $this->mutex = false;
  75. $this->aggregate = new Promise(function () {
  76. reset($this->pending);
  77. if (empty($this->pending) && !$this->iterable->valid()) {
  78. $this->aggregate->resolve(null);
  79. return;
  80. }
  81. // Consume a potentially fluctuating list of promises while
  82. // ensuring that indexes are maintained (precluding array_shift).
  83. while ($promise = current($this->pending)) {
  84. next($this->pending);
  85. $promise->wait();
  86. if ($this->aggregate->getState() !== PromiseInterface::PENDING) {
  87. return;
  88. }
  89. }
  90. });
  91. // Clear the references when the promise is resolved.
  92. $clearFn = function () {
  93. $this->iterable = $this->concurrency = $this->pending = null;
  94. $this->onFulfilled = $this->onRejected = null;
  95. };
  96. $this->aggregate->then($clearFn, $clearFn);
  97. }
  98. private function refillPending()
  99. {
  100. if (!$this->concurrency) {
  101. // Add all pending promises.
  102. while ($this->addPending() && $this->advanceIterator());
  103. return;
  104. }
  105. // Add only up to N pending promises.
  106. $concurrency = is_callable($this->concurrency)
  107. ? call_user_func($this->concurrency, count($this->pending))
  108. : $this->concurrency;
  109. $concurrency = max($concurrency - count($this->pending), 0);
  110. // Concurrency may be set to 0 to disallow new promises.
  111. if (!$concurrency) {
  112. return;
  113. }
  114. // Add the first pending promise.
  115. $this->addPending();
  116. // Note this is special handling for concurrency=1 so that we do
  117. // not advance the iterator after adding the first promise. This
  118. // helps work around issues with generators that might not have the
  119. // next value to yield until promise callbacks are called.
  120. while (--$concurrency
  121. && $this->advanceIterator()
  122. && $this->addPending());
  123. }
  124. private function addPending()
  125. {
  126. if (!$this->iterable || !$this->iterable->valid()) {
  127. return false;
  128. }
  129. $promise = promise_for($this->iterable->current());
  130. $idx = $this->iterable->key();
  131. $this->pending[$idx] = $promise->then(
  132. function ($value) use ($idx) {
  133. if ($this->onFulfilled) {
  134. call_user_func(
  135. $this->onFulfilled, $value, $idx, $this->aggregate
  136. );
  137. }
  138. $this->step($idx);
  139. },
  140. function ($reason) use ($idx) {
  141. if ($this->onRejected) {
  142. call_user_func(
  143. $this->onRejected, $reason, $idx, $this->aggregate
  144. );
  145. }
  146. $this->step($idx);
  147. }
  148. );
  149. return true;
  150. }
  151. private function advanceIterator()
  152. {
  153. // Place a lock on the iterator so that we ensure to not recurse,
  154. // preventing fatal generator errors.
  155. if ($this->mutex) {
  156. return false;
  157. }
  158. $this->mutex = true;
  159. try {
  160. $this->iterable->next();
  161. $this->mutex = false;
  162. return true;
  163. } catch (\Throwable $e) {
  164. $this->aggregate->reject($e);
  165. $this->mutex = false;
  166. return false;
  167. } catch (\Exception $e) {
  168. $this->aggregate->reject($e);
  169. $this->mutex = false;
  170. return false;
  171. }
  172. }
  173. private function step($idx)
  174. {
  175. // If the promise was already resolved, then ignore this step.
  176. if ($this->aggregate->getState() !== PromiseInterface::PENDING) {
  177. return;
  178. }
  179. unset($this->pending[$idx]);
  180. // Only refill pending promises if we are not locked, preventing the
  181. // EachPromise to recursively invoke the provided iterator, which
  182. // cause a fatal error: "Cannot resume an already running generator"
  183. if ($this->advanceIterator() && !$this->checkIfFinished()) {
  184. // Add more pending promises if possible.
  185. $this->refillPending();
  186. }
  187. }
  188. private function checkIfFinished()
  189. {
  190. if (!$this->pending && !$this->iterable->valid()) {
  191. // Resolve the promise if there's nothing left to do.
  192. $this->aggregate->resolve(null);
  193. return true;
  194. }
  195. return false;
  196. }
  197. }