@@ -154,13 +154,14 @@ private function batchConsume()
154154 )
155155 ));
156156 } catch (Exception \StopConsumerException $ e ) {
157- $ this ->logger ->info ('Consumer requested restart ' , array (
157+ $ this ->logger ->info ('Consumer requested stop ' , array (
158158 'amqp ' => array (
159159 'queue ' => $ this ->queueOptions ['name ' ],
160160 'message ' => $ this ->messages ,
161161 'stacktrace ' => $ e ->getTraceAsString ()
162162 )
163163 ));
164+ $ this ->handleProcessMessages ($ e ->getHandleCode ());
164165 $ this ->resetBatch ();
165166 $ this ->stopConsuming ();
166167 } catch (\Exception $ e ) {
@@ -213,12 +214,14 @@ private function handleProcessFlag($deliveryTag, $processFlag)
213214 if ($ processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $ processFlag ) {
214215 // Reject and requeue message to RabbitMQ
215216 $ this ->getMessageChannel ($ deliveryTag )->basic_reject ($ deliveryTag , true );
216- } else if ($ processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE ) {
217+ } elseif ($ processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE ) {
217218 // NACK and requeue message to RabbitMQ
218219 $ this ->getMessageChannel ($ deliveryTag )->basic_nack ($ deliveryTag , false , true );
219- } else if ($ processFlag === ConsumerInterface::MSG_REJECT ) {
220+ } elseif ($ processFlag === ConsumerInterface::MSG_REJECT ) {
220221 // Reject and drop
221222 $ this ->getMessageChannel ($ deliveryTag )->basic_reject ($ deliveryTag , false );
223+ } elseif ($ processFlag == ConsumerInterface::MSG_ACK_SENT ) {
224+ // do nothing, ACK should be already sent
222225 } else {
223226 // Remove message from queue only if callback return not false
224227 $ this ->getMessageChannel ($ deliveryTag )->basic_ack ($ deliveryTag );
0 commit comments