Skip to content

Commit f5cf0d4

Browse files
[12.x] Add concurrency control to Http::pool and Http::batch (#57555)
* Added concurrency to Http::pool * Added concurrency to Http::batch * formatting --------- Co-authored-by: Taylor Otwell <taylor@laravel.com>
1 parent b5e9003 commit f5cf0d4

File tree

3 files changed

+109
-4
lines changed

3 files changed

+109
-4
lines changed

src/Illuminate/Http/Client/Batch.php

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,13 @@ class Batch
114114
*/
115115
public $finishedAt = null;
116116

117+
/**
118+
* The maximum number of concurrent requests.
119+
*
120+
* @var int|null
121+
*/
122+
protected $concurrencyLimit = null;
123+
117124
/**
118125
* Create a new request batch instance.
119126
*/
@@ -208,6 +215,19 @@ public function finally(Closure $callback): self
208215
return $this;
209216
}
210217

218+
/**
219+
* Set the maximum number of concurrent requests.
220+
*
221+
* @param int $limit
222+
* @return Batch
223+
*/
224+
public function concurrency(int $limit): self
225+
{
226+
$this->concurrencyLimit = $limit;
227+
228+
return $this;
229+
}
230+
211231
/**
212232
* Defer the batch to run in the background after the current task has finished.
213233
*
@@ -244,7 +264,7 @@ public function send(): array
244264
}
245265

246266
if (! empty($promises)) {
247-
(new EachPromise($promises, [
267+
$eachPromiseOptions = [
248268
'fulfilled' => function ($result, $key) use (&$results) {
249269
$results[$key] = $result;
250270

@@ -285,7 +305,13 @@ public function send(): array
285305

286306
return $reason;
287307
},
288-
]))->promise()->wait();
308+
];
309+
310+
if ($this->concurrencyLimit !== null) {
311+
$eachPromiseOptions['concurrency'] = $this->concurrencyLimit;
312+
}
313+
314+
(new EachPromise($promises, $eachPromiseOptions))->promise()->wait();
289315
}
290316

291317
// Before returning the results, we must ensure that the results are sorted

src/Illuminate/Http/Client/PendingRequest.php

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use GuzzleHttp\Exception\TransferException;
1212
use GuzzleHttp\HandlerStack;
1313
use GuzzleHttp\Middleware;
14+
use GuzzleHttp\Promise\EachPromise;
1415
use GuzzleHttp\UriTemplate\UriTemplate;
1516
use Illuminate\Contracts\Support\Arrayable;
1617
use Illuminate\Http\Client\Events\ConnectionFailed;
@@ -884,18 +885,39 @@ public function delete(string $url, $data = [])
884885
* Send a pool of asynchronous requests concurrently.
885886
*
886887
* @param callable $callback
888+
* @param int|null $concurrency
887889
* @return array<array-key, \Illuminate\Http\Client\Response>
888890
*/
889-
public function pool(callable $callback)
891+
public function pool(callable $callback, ?int $concurrency = null)
890892
{
891893
$results = [];
892894

893895
$requests = tap(new Pool($this->factory), $callback)->getRequests();
894896

897+
if ($concurrency === null) {
898+
foreach ($requests as $key => $item) {
899+
$results[$key] = $item instanceof static ? $item->getPromise()->wait() : $item->wait();
900+
}
901+
902+
return $results;
903+
}
904+
905+
$promises = [];
906+
895907
foreach ($requests as $key => $item) {
896-
$results[$key] = $item instanceof static ? $item->getPromise()->wait() : $item->wait();
908+
$promises[$key] = $item instanceof static ? $item->getPromise() : $item;
897909
}
898910

911+
(new EachPromise($promises, [
912+
'fulfilled' => function ($result, $key) use (&$results) {
913+
$results[$key] = $result;
914+
},
915+
'rejected' => function ($reason, $key) use (&$results) {
916+
$results[$key] = $reason;
917+
},
918+
'concurrency' => $concurrency,
919+
]))->promise()->wait();
920+
899921
return $results;
900922
}
901923

tests/Http/HttpClientTest.php

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1910,6 +1910,27 @@ public function testMiddlewareRunsInPool()
19101910
$this->assertSame(['hyped-for' => 'laravel-movie'], json_decode(tap($history[0]['request']->getBody())->rewind()->getContents(), true));
19111911
}
19121912

1913+
public function testPoolConcurrency()
1914+
{
1915+
$this->factory->fake([
1916+
'200.com' => $this->factory::response('', 200),
1917+
'400.com' => $this->factory::response('', 400),
1918+
'500.com' => $this->factory::response('', 500),
1919+
]);
1920+
1921+
$responses = $this->factory->pool(function (Pool $pool) {
1922+
return [
1923+
$pool->get('200.com'),
1924+
$pool->get('400.com'),
1925+
$pool->get('500.com'),
1926+
];
1927+
}, 2);
1928+
1929+
$this->assertSame(200, $responses[0]->status());
1930+
$this->assertSame(400, $responses[1]->status());
1931+
$this->assertSame(500, $responses[2]->status());
1932+
}
1933+
19131934
public function testTheRequestSendingAndResponseReceivedEventsAreFiredWhenARequestIsSent()
19141935
{
19151936
$events = m::mock(Dispatcher::class);
@@ -4068,6 +4089,42 @@ public function testBatchFinallyHookIsCalledWithErrors(): void
40684089
$this->assertSame($responses['second'], $finallyCallback['second']);
40694090
}
40704091

4092+
public function testBatchConcurrency(): void
4093+
{
4094+
$this->factory->fake([
4095+
'https://200.com' => $this->factory::response('OK', 200),
4096+
'https://201.com' => $this->factory::response('Created', 201),
4097+
'https://202.com' => $this->factory::response('Accepted', 202),
4098+
'https://203.com' => $this->factory::response('Non-Authoritative Information', 203),
4099+
]);
4100+
4101+
$executionOrder = [];
4102+
4103+
$batch = $this->factory->batch(function (Batch $batch) {
4104+
return [
4105+
$batch->as('first')->get('https://200.com'),
4106+
$batch->as('second')->get('https://201.com'),
4107+
$batch->as('third')->get('https://202.com'),
4108+
$batch->as('fourth')->get('https://203.com'),
4109+
];
4110+
})->progress(function (Batch $batch, int|string $key, Response $response) use (&$executionOrder) {
4111+
$executionOrder[] = $key;
4112+
})->concurrency(2);
4113+
4114+
$this->assertSame(4, $batch->totalRequests);
4115+
4116+
$responses = $batch->send();
4117+
4118+
$this->assertSame(200, $responses['first']->status());
4119+
$this->assertSame(201, $responses['second']->status());
4120+
$this->assertSame(202, $responses['third']->status());
4121+
$this->assertSame(203, $responses['fourth']->status());
4122+
4123+
$this->assertSame(4, $batch->totalRequests);
4124+
$this->assertSame(0, $batch->pendingRequests);
4125+
$this->assertSame(0, $batch->failedRequests);
4126+
}
4127+
40714128
public static function methodsReceivingArrayableDataProvider()
40724129
{
40734130
return [

0 commit comments

Comments
 (0)