|
2 | 2 |
|
3 | 3 | namespace palPalani\SqsQueueReader\Sqs; |
4 | 4 |
|
| 5 | +use Aws\Exception\AwsException; |
5 | 6 | use Illuminate\Queue\Jobs\SqsJob; |
6 | 7 | use Illuminate\Queue\SqsQueue; |
7 | 8 | use Illuminate\Support\Facades\Config; |
@@ -64,26 +65,31 @@ public function pop($queue = null) |
64 | 65 | { |
65 | 66 | $queue = $this->getQueue($queue); |
66 | 67 |
|
67 | | - $response = $this->sqs->receiveMessage([ |
68 | | - 'QueueUrl' => $queue, |
69 | | - 'AttributeNames' => ['ApproximateReceiveCount'], |
70 | | - 'MaxNumberOfMessages' => 5, |
71 | | - 'MessageAttributeNames' => ['All'], |
72 | | - ]); |
73 | | - |
74 | | - if (isset($response['Messages']) && count($response['Messages']) > 0) { |
75 | | - Log::debug('Messages==', [$response['Messages']]); |
76 | | - $queueId = explode('/', $queue); |
77 | | - $queueId = array_pop($queueId); |
78 | | - |
79 | | - $class = (array_key_exists($queueId, $this->container['config']->get('sqs-queue-reader.handlers'))) |
80 | | - ? $this->container['config']->get('sqs-queue-reader.handlers')[$queueId] |
81 | | - : $this->container['config']->get('sqs-queue-reader.default-handler'); |
82 | | - |
83 | | - $response = $this->modifyPayload($response['Messages'], $class); |
84 | | - Log::debug('New $responseV2==', [$response]); |
85 | | - |
86 | | - return new SqsJob($this->container, $this->sqs, $response, $this->connectionName, $queue); |
| 68 | + try { |
| 69 | + $response = $this->sqs->receiveMessage([ |
| 70 | + 'QueueUrl' => $queue, |
| 71 | + 'AttributeNames' => ['ApproximateReceiveCount'], |
| 72 | + 'MaxNumberOfMessages' => 5, |
| 73 | + 'MessageAttributeNames' => ['All'], |
| 74 | + ]); |
| 75 | + |
| 76 | + if (isset($response['Messages']) && count($response['Messages']) > 0) { |
| 77 | + Log::debug('Messages==', [$response['Messages']]); |
| 78 | + $queueId = explode('/', $queue); |
| 79 | + $queueId = array_pop($queueId); |
| 80 | + |
| 81 | + $class = (array_key_exists($queueId, $this->container['config']->get('sqs-queue-reader.handlers'))) |
| 82 | + ? $this->container['config']->get('sqs-queue-reader.handlers')[$queueId] |
| 83 | + : $this->container['config']->get('sqs-queue-reader.default-handler'); |
| 84 | + |
| 85 | + $response = $this->modifyPayload($response['Messages'], $class); |
| 86 | + Log::debug('New $responseV2==', [$response]); |
| 87 | + |
| 88 | + return new SqsJob($this->container, $this->sqs, $response, $this->connectionName, $queue); |
| 89 | + } |
| 90 | + } catch (AwsException $e) { |
| 91 | + $msg = 'Line: '. $e->getLine() .', '. $e->getFile() . ', '. $e->getMessage(); |
| 92 | + throw new \RuntimeException("Aws SQS error: " . $msg); |
87 | 93 | } |
88 | 94 | } |
89 | 95 |
|
@@ -111,9 +117,19 @@ private function modifyPayload($payload, $class) |
111 | 117 |
|
112 | 118 | $body = []; |
113 | 119 | $attributes = []; |
| 120 | + $batchIds = []; |
| 121 | + |
114 | 122 | foreach ($payload as $item) { |
115 | 123 | //Log::debug('Each Messages==', [$item]); |
116 | | - $body[] = json_decode($item['Body'], true); |
| 124 | + //$body[] = json_decode($item['Body'], true); |
| 125 | + $body[] = [ |
| 126 | + 'messages' => json_decode($item['Body'], true), |
| 127 | + 'attributes' => $item['Attributes'], |
| 128 | + 'batchIds' => [ |
| 129 | + 'Id' => $item['MessageId'], |
| 130 | + 'ReceiptHandle' => $item['ReceiptHandle'], |
| 131 | + ] |
| 132 | + ]; |
117 | 133 | $attributes = $item['Attributes']; |
118 | 134 | $messageId = $item['MessageId']; |
119 | 135 | $receiptHandle = $item['ReceiptHandle']; |
|
0 commit comments