@@ -724,19 +724,27 @@ private ResponseBuffers receiveResponseBuffers(final int additionalTimeout) thro
724724 }
725725
726726 ByteBuf messageBuffer = stream .read (messageHeader .getMessageLength () - MESSAGE_HEADER_LENGTH , additionalTimeout );
727+ boolean releaseMessageBuffer = true ;
728+ try {
729+ if (messageHeader .getOpCode () == OP_COMPRESSED .getValue ()) {
730+ CompressedHeader compressedHeader = new CompressedHeader (messageBuffer , messageHeader );
727731
728- if (messageHeader .getOpCode () == OP_COMPRESSED .getValue ()) {
729- CompressedHeader compressedHeader = new CompressedHeader (messageBuffer , messageHeader );
730-
731- Compressor compressor = getCompressor (compressedHeader );
732+ Compressor compressor = getCompressor (compressedHeader );
732733
733- ByteBuf buffer = getBuffer (compressedHeader .getUncompressedSize ());
734- compressor .uncompress (messageBuffer , buffer );
734+ ByteBuf buffer = getBuffer (compressedHeader .getUncompressedSize ());
735+ compressor .uncompress (messageBuffer , buffer );
735736
736- buffer .flip ();
737- return new ResponseBuffers (new ReplyHeader (buffer , compressedHeader ), buffer );
738- } else {
739- return new ResponseBuffers (new ReplyHeader (messageBuffer , messageHeader ), messageBuffer );
737+ buffer .flip ();
738+ return new ResponseBuffers (new ReplyHeader (buffer , compressedHeader ), buffer );
739+ } else {
740+ ResponseBuffers responseBuffers = new ResponseBuffers (new ReplyHeader (messageBuffer , messageHeader ), messageBuffer );
741+ releaseMessageBuffer = false ;
742+ return responseBuffers ;
743+ }
744+ } finally {
745+ if (releaseMessageBuffer ) {
746+ messageBuffer .release ();
747+ }
740748 }
741749 }
742750
@@ -792,6 +800,7 @@ public void onResult(final ByteBuf result, final Throwable t) {
792800 callback .onResult (null , t );
793801 return ;
794802 }
803+ boolean releaseResult = true ;
795804 try {
796805 ReplyHeader replyHeader ;
797806 ByteBuf responseBuffer ;
@@ -806,15 +815,21 @@ public void onResult(final ByteBuf result, final Throwable t) {
806815 replyHeader = new ReplyHeader (buffer , compressedHeader );
807816 responseBuffer = buffer ;
808817 } finally {
818+ releaseResult = false ;
809819 result .release ();
810820 }
811821 } else {
812822 replyHeader = new ReplyHeader (result , messageHeader );
813823 responseBuffer = result ;
824+ releaseResult = false ;
814825 }
815826 callback .onResult (new ResponseBuffers (replyHeader , responseBuffer ), null );
816827 } catch (Throwable localThrowable ) {
817828 callback .onResult (null , localThrowable );
829+ } finally {
830+ if (releaseResult ) {
831+ result .release ();
832+ }
818833 }
819834 }
820835 }
0 commit comments