X-Git-Url: http://www.aleph1.co.uk/gitweb/?p=yaffs-website;a=blobdiff_plain;f=vendor%2Fguzzlehttp%2Fpromises%2Fsrc%2FEachPromise.php;fp=vendor%2Fguzzlehttp%2Fpromises%2Fsrc%2FEachPromise.php;h=d0ddf603fb3d566bc90a7f711b0723b876782983;hp=0000000000000000000000000000000000000000;hb=a2bd1bf0c2c1f1a17d188f4dc0726a45494cefae;hpb=57c063afa3f66b07c4bbddc2d6129a96d90f0aad diff --git a/vendor/guzzlehttp/promises/src/EachPromise.php b/vendor/guzzlehttp/promises/src/EachPromise.php new file mode 100644 index 000000000..d0ddf603f --- /dev/null +++ b/vendor/guzzlehttp/promises/src/EachPromise.php @@ -0,0 +1,229 @@ +iterable = iter_for($iterable); + + if (isset($config['concurrency'])) { + $this->concurrency = $config['concurrency']; + } + + if (isset($config['fulfilled'])) { + $this->onFulfilled = $config['fulfilled']; + } + + if (isset($config['rejected'])) { + $this->onRejected = $config['rejected']; + } + } + + public function promise() + { + if ($this->aggregate) { + return $this->aggregate; + } + + try { + $this->createPromise(); + $this->iterable->rewind(); + $this->refillPending(); + } catch (\Throwable $e) { + $this->aggregate->reject($e); + } catch (\Exception $e) { + $this->aggregate->reject($e); + } + + return $this->aggregate; + } + + private function createPromise() + { + $this->mutex = false; + $this->aggregate = new Promise(function () { + reset($this->pending); + if (empty($this->pending) && !$this->iterable->valid()) { + $this->aggregate->resolve(null); + return; + } + + // Consume a potentially fluctuating list of promises while + // ensuring that indexes are maintained (precluding array_shift). + while ($promise = current($this->pending)) { + next($this->pending); + $promise->wait(); + if ($this->aggregate->getState() !== PromiseInterface::PENDING) { + return; + } + } + }); + + // Clear the references when the promise is resolved. + $clearFn = function () { + $this->iterable = $this->concurrency = $this->pending = null; + $this->onFulfilled = $this->onRejected = null; + }; + + $this->aggregate->then($clearFn, $clearFn); + } + + private function refillPending() + { + if (!$this->concurrency) { + // Add all pending promises. + while ($this->addPending() && $this->advanceIterator()); + return; + } + + // Add only up to N pending promises. + $concurrency = is_callable($this->concurrency) + ? call_user_func($this->concurrency, count($this->pending)) + : $this->concurrency; + $concurrency = max($concurrency - count($this->pending), 0); + // Concurrency may be set to 0 to disallow new promises. + if (!$concurrency) { + return; + } + // Add the first pending promise. + $this->addPending(); + // Note this is special handling for concurrency=1 so that we do + // not advance the iterator after adding the first promise. This + // helps work around issues with generators that might not have the + // next value to yield until promise callbacks are called. + while (--$concurrency + && $this->advanceIterator() + && $this->addPending()); + } + + private function addPending() + { + if (!$this->iterable || !$this->iterable->valid()) { + return false; + } + + $promise = promise_for($this->iterable->current()); + $idx = $this->iterable->key(); + + $this->pending[$idx] = $promise->then( + function ($value) use ($idx) { + if ($this->onFulfilled) { + call_user_func( + $this->onFulfilled, $value, $idx, $this->aggregate + ); + } + $this->step($idx); + }, + function ($reason) use ($idx) { + if ($this->onRejected) { + call_user_func( + $this->onRejected, $reason, $idx, $this->aggregate + ); + } + $this->step($idx); + } + ); + + return true; + } + + private function advanceIterator() + { + // Place a lock on the iterator so that we ensure to not recurse, + // preventing fatal generator errors. + if ($this->mutex) { + return false; + } + + $this->mutex = true; + + try { + $this->iterable->next(); + $this->mutex = false; + return true; + } catch (\Throwable $e) { + $this->aggregate->reject($e); + $this->mutex = false; + return false; + } catch (\Exception $e) { + $this->aggregate->reject($e); + $this->mutex = false; + return false; + } + } + + private function step($idx) + { + // If the promise was already resolved, then ignore this step. + if ($this->aggregate->getState() !== PromiseInterface::PENDING) { + return; + } + + unset($this->pending[$idx]); + + // Only refill pending promises if we are not locked, preventing the + // EachPromise to recursively invoke the provided iterator, which + // cause a fatal error: "Cannot resume an already running generator" + if ($this->advanceIterator() && !$this->checkIfFinished()) { + // Add more pending promises if possible. + $this->refillPending(); + } + } + + private function checkIfFinished() + { + if (!$this->pending && !$this->iterable->valid()) { + // Resolve the promise if there's nothing left to do. + $this->aggregate->resolve(null); + return true; + } + + return false; + } +}