Security update to Drupal 8.4.6
[yaffs-website] / vendor / guzzlehttp / promises / src / EachPromise.php
1 <?php
2 namespace GuzzleHttp\Promise;
3
4 /**
5  * Represents a promise that iterates over many promises and invokes
6  * side-effect functions in the process.
7  */
8 class EachPromise implements PromisorInterface
9 {
10     private $pending = [];
11
12     /** @var \Iterator */
13     private $iterable;
14
15     /** @var callable|int */
16     private $concurrency;
17
18     /** @var callable */
19     private $onFulfilled;
20
21     /** @var callable */
22     private $onRejected;
23
24     /** @var Promise */
25     private $aggregate;
26
27     /** @var bool */
28     private $mutex;
29
30     /**
31      * Configuration hash can include the following key value pairs:
32      *
33      * - fulfilled: (callable) Invoked when a promise fulfills. The function
34      *   is invoked with three arguments: the fulfillment value, the index
35      *   position from the iterable list of the promise, and the aggregate
36      *   promise that manages all of the promises. The aggregate promise may
37      *   be resolved from within the callback to short-circuit the promise.
38      * - rejected: (callable) Invoked when a promise is rejected. The
39      *   function is invoked with three arguments: the rejection reason, the
40      *   index position from the iterable list of the promise, and the
41      *   aggregate promise that manages all of the promises. The aggregate
42      *   promise may be resolved from within the callback to short-circuit
43      *   the promise.
44      * - concurrency: (integer) Pass this configuration option to limit the
45      *   allowed number of outstanding concurrently executing promises,
46      *   creating a capped pool of promises. There is no limit by default.
47      *
48      * @param mixed    $iterable Promises or values to iterate.
49      * @param array    $config   Configuration options
50      */
51     public function __construct($iterable, array $config = [])
52     {
53         $this->iterable = iter_for($iterable);
54
55         if (isset($config['concurrency'])) {
56             $this->concurrency = $config['concurrency'];
57         }
58
59         if (isset($config['fulfilled'])) {
60             $this->onFulfilled = $config['fulfilled'];
61         }
62
63         if (isset($config['rejected'])) {
64             $this->onRejected = $config['rejected'];
65         }
66     }
67
68     public function promise()
69     {
70         if ($this->aggregate) {
71             return $this->aggregate;
72         }
73
74         try {
75             $this->createPromise();
76             $this->iterable->rewind();
77             $this->refillPending();
78         } catch (\Throwable $e) {
79             $this->aggregate->reject($e);
80         } catch (\Exception $e) {
81             $this->aggregate->reject($e);
82         }
83
84         return $this->aggregate;
85     }
86
87     private function createPromise()
88     {
89         $this->mutex = false;
90         $this->aggregate = new Promise(function () {
91             reset($this->pending);
92             if (empty($this->pending) && !$this->iterable->valid()) {
93                 $this->aggregate->resolve(null);
94                 return;
95             }
96
97             // Consume a potentially fluctuating list of promises while
98             // ensuring that indexes are maintained (precluding array_shift).
99             while ($promise = current($this->pending)) {
100                 next($this->pending);
101                 $promise->wait();
102                 if ($this->aggregate->getState() !== PromiseInterface::PENDING) {
103                     return;
104                 }
105             }
106         });
107
108         // Clear the references when the promise is resolved.
109         $clearFn = function () {
110             $this->iterable = $this->concurrency = $this->pending = null;
111             $this->onFulfilled = $this->onRejected = null;
112         };
113
114         $this->aggregate->then($clearFn, $clearFn);
115     }
116
117     private function refillPending()
118     {
119         if (!$this->concurrency) {
120             // Add all pending promises.
121             while ($this->addPending() && $this->advanceIterator());
122             return;
123         }
124
125         // Add only up to N pending promises.
126         $concurrency = is_callable($this->concurrency)
127             ? call_user_func($this->concurrency, count($this->pending))
128             : $this->concurrency;
129         $concurrency = max($concurrency - count($this->pending), 0);
130         // Concurrency may be set to 0 to disallow new promises.
131         if (!$concurrency) {
132             return;
133         }
134         // Add the first pending promise.
135         $this->addPending();
136         // Note this is special handling for concurrency=1 so that we do
137         // not advance the iterator after adding the first promise. This
138         // helps work around issues with generators that might not have the
139         // next value to yield until promise callbacks are called.
140         while (--$concurrency
141             && $this->advanceIterator()
142             && $this->addPending());
143     }
144
145     private function addPending()
146     {
147         if (!$this->iterable || !$this->iterable->valid()) {
148             return false;
149         }
150
151         $promise = promise_for($this->iterable->current());
152         $idx = $this->iterable->key();
153
154         $this->pending[$idx] = $promise->then(
155             function ($value) use ($idx) {
156                 if ($this->onFulfilled) {
157                     call_user_func(
158                         $this->onFulfilled, $value, $idx, $this->aggregate
159                     );
160                 }
161                 $this->step($idx);
162             },
163             function ($reason) use ($idx) {
164                 if ($this->onRejected) {
165                     call_user_func(
166                         $this->onRejected, $reason, $idx, $this->aggregate
167                     );
168                 }
169                 $this->step($idx);
170             }
171         );
172
173         return true;
174     }
175
176     private function advanceIterator()
177     {
178         // Place a lock on the iterator so that we ensure to not recurse,
179         // preventing fatal generator errors.
180         if ($this->mutex) {
181             return false;
182         }
183
184         $this->mutex = true;
185
186         try {
187             $this->iterable->next();
188             $this->mutex = false;
189             return true;
190         } catch (\Throwable $e) {
191             $this->aggregate->reject($e);
192             $this->mutex = false;
193             return false;
194         } catch (\Exception $e) {
195             $this->aggregate->reject($e);
196             $this->mutex = false;
197             return false;
198         }
199     }
200
201     private function step($idx)
202     {
203         // If the promise was already resolved, then ignore this step.
204         if ($this->aggregate->getState() !== PromiseInterface::PENDING) {
205             return;
206         }
207
208         unset($this->pending[$idx]);
209
210         // Only refill pending promises if we are not locked, preventing the
211         // EachPromise to recursively invoke the provided iterator, which
212         // cause a fatal error: "Cannot resume an already running generator"
213         if ($this->advanceIterator() && !$this->checkIfFinished()) {
214             // Add more pending promises if possible.
215             $this->refillPending();
216         }
217     }
218
219     private function checkIfFinished()
220     {
221         if (!$this->pending && !$this->iterable->valid()) {
222             // Resolve the promise if there's nothing left to do.
223             $this->aggregate->resolve(null);
224             return true;
225         }
226
227         return false;
228     }
229 }