Skip to content

Commit 6c6c04b

Browse files
author
freyhsiao
committed
add retries cfg.
1 parent f627127 commit 6c6c04b

File tree

3 files changed

+30
-21
lines changed

3 files changed

+30
-21
lines changed

config/cmq.php

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@
1919
'host' => env('CMQ_QUEUE_HOST', 'https://cmq-queue-region.api.qcloud.com'),
2020
'name' => env('CMQ_QUEUE', 'default'),
2121
'polling_wait_seconds' => env('CMQ_QUEUE_POLLING_WAIT_SECONDS', 0), //0-30seconds
22+
'retries' => env('CMQ_QUEUE_RETRIES', 3),
2223
],
2324
'topic' => [
24-
'enable' => env('CMQ_TOPIC_ENABLE', false),
25-
'filter' => env('CMQ_TOPIC_FILTER', 'routing'), //routing or msgtag
26-
'host' => env('CMQ_TOPIC_HOST', 'https://cmq-topic-region.api.qcloud.com'),
27-
'name' => env('CMQ_TOPIC'),
25+
'enable' => env('CMQ_TOPIC_ENABLE', false),
26+
'filter' => env('CMQ_TOPIC_FILTER', 'routing'), //routing or msgtag
27+
'host' => env('CMQ_TOPIC_HOST', 'https://cmq-topic-region.api.qcloud.com'),
28+
'name' => env('CMQ_TOPIC'),
29+
'retries' => env('CMQ_TOPIC_RETRIES', 3),
2830
],
2931
],
3032

src/Queue/CMQQueue.php

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ class CMQQueue extends Queue implements QueueContract
3535
*/
3636
protected $plainOptions;
3737

38+
/**
39+
* @var \ReflectionMethod
40+
*/
41+
private static $createPayload;
42+
3843
public function __construct(Account $queueAccount, Account $topicAccount, array $config)
3944
{
4045
$this->queueAccount = $queueAccount;
@@ -44,6 +49,8 @@ public function __construct(Account $queueAccount, Account $topicAccount, array
4449
$this->topicOptions = $config['options']['topic'];
4550

4651
$this->plainOptions = Arr::get($config, 'plain', []);
52+
53+
self::$createPayload = new \ReflectionMethod($this, 'createPayload');
4754
}
4855

4956
/**
@@ -84,7 +91,6 @@ public function size($queue = null)
8491
* @param string $queue
8592
*
8693
* @return mixed
87-
* @throws \ReflectionException
8894
* @throws \Exception
8995
*/
9096
public function push($job, $data = '', $queue = null)
@@ -93,8 +99,7 @@ public function push($job, $data = '', $queue = null)
9399
return $this->pushRaw($job->getPayload(), $queue);
94100
}
95101

96-
$reflection = new \ReflectionMethod($this, 'createPayload');
97-
if ($reflection->getNumberOfParameters() === 3) { // version >= 5.7
102+
if (self::$createPayload->getNumberOfParameters() === 3) { // version >= 5.7
98103
$payload = $this->createPayload($job, $queue, $data);
99104
} else {
100105
$payload = $this->createPayload($job, $data);
@@ -124,21 +129,23 @@ public function pushRaw($payload, $queue = null, array $options = [])
124129
if ($driver instanceof Topic) {
125130
switch ($this->topicOptions['filter']) {
126131
case self::CMQ_TOPIC_TAG_FILTER_NAME:
127-
return retry(3, function () use ($driver, $message, $queue) {
128-
return $driver->publish_message($message->msgBody, explode(',', $queue), null);
129-
});
132+
return retry(Arr::get($this->topicOptions, 'retries', 1),
133+
function () use ($driver, $message, $queue) {
134+
return $driver->publish_message($message->msgBody, explode(',', $queue), null);
135+
});
130136
case self::CMQ_TOPIC_ROUTING_FILTER_NAME:
131-
return retry(3, function () use ($driver, $message, $queue) {
132-
$driver->publish_message($message->msgBody, [], $queue);
133-
});
137+
return retry(Arr::get($this->topicOptions, 'retries', 1),
138+
function () use ($driver, $message, $queue) {
139+
$driver->publish_message($message->msgBody, [], $queue);
140+
});
134141
default:
135142
throw new \InvalidArgumentException(
136143
'Invalid CMQ topic filter: ' . $this->topicOptions['filter']
137144
);
138145
}
139146
}
140147

141-
return retry(3, function () use ($driver, $message, $options) {
148+
return retry(Arr::get($this->queueOptions, 'retries', 1), function () use ($driver, $message, $options) {
142149
return $driver->send_message($message, Arr::get($options, 'delay', 0));
143150
});
144151
}
@@ -152,7 +159,6 @@ public function pushRaw($payload, $queue = null, array $options = [])
152159
* @param string $queue
153160
*
154161
* @return mixed
155-
* @throws \ReflectionException
156162
* @throws \Exception
157163
*/
158164
public function later($delay, $job, $data = '', $queue = null)
@@ -165,8 +171,7 @@ public function later($delay, $job, $data = '', $queue = null)
165171
return $this->pushRaw($job->getPayload(), $queue, ['delay' => $delay]);
166172
}
167173

168-
$reflection = new \ReflectionMethod($this, 'createPayload');
169-
if ($reflection->getNumberOfParameters() === 3) { // version >= 5.7
174+
if (self::$createPayload->getNumberOfParameters() === 3) { // version >= 5.7
170175
$payload = $this->createPayload($job, $queue, $data);
171176
} else {
172177
$payload = $this->createPayload($job, $data);

tests/CMQQueueTest.php

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@ public function provider()
2929
'host' => env('CMQ_QUEUE_HOST', 'https://cmq-queue-region.api.qcloud.com'),
3030
'name' => env('CMQ_QUEUE', 'default'),
3131
'polling_wait_seconds' => env('CMQ_QUEUE_POLLING_WAIT_SECONDS', 0), //0-30seconds
32+
'retries' => env('CMQ_QUEUE_RETRIES', 1),
3233
],
3334
'topic' => [
34-
'enable' => env('CMQ_TOPIC_ENABLE', false),
35-
'filter' => env('CMQ_TOPIC_FILTER', 'routing'), //routing or msgtag
36-
'host' => env('CMQ_TOPIC_HOST', 'https://cmq-topic-region.api.qcloud.com'),
37-
'name' => env('CMQ_TOPIC'),
35+
'enable' => env('CMQ_TOPIC_ENABLE', false),
36+
'filter' => env('CMQ_TOPIC_FILTER', 'routing'), //routing or msgtag
37+
'host' => env('CMQ_TOPIC_HOST', 'https://cmq-topic-region.api.qcloud.com'),
38+
'name' => env('CMQ_TOPIC'),
39+
'retries' => env('CMQ_TOPIC_RETRIES', 1),
3840
],
3941
],
4042

0 commit comments

Comments
 (0)