diff --git a/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java b/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java index 348349fd18c..85a87a80541 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java +++ b/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java @@ -149,48 +149,43 @@ public final class CommandMessage extends RequestMessage { * `PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE` sections. */ BsonDocument getCommandDocument(final ByteBufferBsonOutput bsonOutput) { - List byteBuffers = bsonOutput.getByteBuffers(); + CompositeByteBuf byteBuf = new CompositeByteBuf(bsonOutput.getByteBuffers()); try { - CompositeByteBuf byteBuf = new CompositeByteBuf(byteBuffers); - try { - byteBuf.position(firstDocumentPosition); - ByteBufBsonDocument byteBufBsonDocument = createOne(byteBuf); - - // If true, it means there is at least one `PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE` section in the OP_MSG - if (byteBuf.hasRemaining()) { - BsonDocument commandBsonDocument = byteBufBsonDocument.toBaseBsonDocument(); - - // Each loop iteration processes one Document Sequence - // When there are no more bytes remaining, there are no more Document Sequences - while (byteBuf.hasRemaining()) { - // skip reading the payload type, we know it is `PAYLOAD_TYPE_1` - byteBuf.position(byteBuf.position() + 1); - int sequenceStart = byteBuf.position(); - int sequenceSizeInBytes = byteBuf.getInt(); - int sectionEnd = sequenceStart + sequenceSizeInBytes; - - String fieldName = getSequenceIdentifier(byteBuf); - // If this assertion fires, it means that the driver has started using document sequences for nested fields. If - // so, this method will need to change in order to append the value to the correct nested document. - assertFalse(fieldName.contains(".")); - - ByteBuf documentsByteBufSlice = byteBuf.duplicate().limit(sectionEnd); - try { - commandBsonDocument.append(fieldName, new BsonArray(createList(documentsByteBufSlice))); - } finally { - documentsByteBufSlice.release(); - } - byteBuf.position(sectionEnd); + byteBuf.position(firstDocumentPosition); + ByteBufBsonDocument byteBufBsonDocument = createOne(byteBuf); + + // If true, it means there is at least one `PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE` section in the OP_MSG + if (byteBuf.hasRemaining()) { + BsonDocument commandBsonDocument = byteBufBsonDocument.toBaseBsonDocument(); + + // Each loop iteration processes one Document Sequence + // When there are no more bytes remaining, there are no more Document Sequences + while (byteBuf.hasRemaining()) { + // skip reading the payload type, we know it is `PAYLOAD_TYPE_1` + byteBuf.position(byteBuf.position() + 1); + int sequenceStart = byteBuf.position(); + int sequenceSizeInBytes = byteBuf.getInt(); + int sectionEnd = sequenceStart + sequenceSizeInBytes; + + String fieldName = getSequenceIdentifier(byteBuf); + // If this assertion fires, it means that the driver has started using document sequences for nested fields. If + // so, this method will need to change in order to append the value to the correct nested document. + assertFalse(fieldName.contains(".")); + + ByteBuf documentsByteBufSlice = byteBuf.duplicate().limit(sectionEnd); + try { + commandBsonDocument.append(fieldName, new BsonArray(createList(documentsByteBufSlice))); + } finally { + documentsByteBufSlice.release(); } - return commandBsonDocument; - } else { - return byteBufBsonDocument; + byteBuf.position(sectionEnd); } - } finally { - byteBuf.release(); + return commandBsonDocument; + } else { + return byteBufBsonDocument; } } finally { - byteBuffers.forEach(ByteBuf::release); + byteBuf.release(); } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/CompositeByteBuf.java b/driver-core/src/main/com/mongodb/internal/connection/CompositeByteBuf.java index a3ce668040c..d4ea250dd77 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/CompositeByteBuf.java +++ b/driver-core/src/main/com/mongodb/internal/connection/CompositeByteBuf.java @@ -16,6 +16,7 @@ package com.mongodb.internal.connection; +import com.mongodb.assertions.Assertions; import org.bson.ByteBuf; import java.nio.Buffer; @@ -50,7 +51,11 @@ class CompositeByteBuf implements ByteBuf { } CompositeByteBuf(final CompositeByteBuf from) { - components = from.components; + notNull("from", from); + components = new ArrayList<>(from.components.size()); + from.components.forEach(component -> + components.add(new Component(component.buffer.duplicate(), component.offset)) + ); position = from.position(); limit = from.limit(); } @@ -306,6 +311,7 @@ public ByteBuf retain() { referenceCount.decrementAndGet(); throw new IllegalStateException("Attempted to increment the reference count when it is already 0"); } + components.forEach(c -> c.buffer.retain()); return this; } @@ -315,6 +321,11 @@ public void release() { referenceCount.incrementAndGet(); throw new IllegalStateException("Attempted to decrement the reference count below 0"); } + components.forEach(c -> c.buffer.release()); + if (referenceCount.get() == 0) { + Assertions.assertTrue(components.stream().noneMatch(c -> c.buffer.getReferenceCount() > 0), + "All component buffers should have reference count 0 when CompositeByteBuf is fully released, but some still have references."); + } } private Component findComponent(final int index) { diff --git a/driver-core/src/main/com/mongodb/internal/connection/netty/NettyByteBuf.java b/driver-core/src/main/com/mongodb/internal/connection/netty/NettyByteBuf.java index c81cc87dee6..14b11c3c31e 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/netty/NettyByteBuf.java +++ b/driver-core/src/main/com/mongodb/internal/connection/netty/NettyByteBuf.java @@ -20,12 +20,13 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.util.concurrent.atomic.AtomicInteger; /** *

This class is not part of the public API and may be removed or changed at any time

*/ public final class NettyByteBuf implements ByteBuf { - + private final AtomicInteger referenceCount = new AtomicInteger(1); private io.netty.buffer.ByteBuf proxied; private boolean isWriting = true; @@ -271,17 +272,25 @@ public ByteBuffer asNIO() { @Override public int getReferenceCount() { - return proxied.refCnt(); + return referenceCount.get(); } @Override public ByteBuf retain() { + if (referenceCount.incrementAndGet() == 1) { + referenceCount.decrementAndGet(); + throw new IllegalStateException("Attempted to increment the reference count when it is already 0"); + } proxied.retain(); return this; } @Override public void release() { + if (referenceCount.decrementAndGet() < 0) { + referenceCount.incrementAndGet(); + throw new IllegalStateException("Attempted to decrement the reference count below 0"); + } proxied.release(); } } diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/CommandHelperSpecification.groovy b/driver-core/src/test/functional/com/mongodb/internal/connection/CommandHelperSpecification.groovy index aaf8a788da3..642ce0f1023 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/CommandHelperSpecification.groovy +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/CommandHelperSpecification.groovy @@ -44,6 +44,7 @@ class CommandHelperSpecification extends Specification { InternalConnection connection def setup() { + InternalStreamConnection.setRecordEverything(true) // Ensures implementation can log as expected connection = new InternalStreamConnectionFactory(ClusterConnectionMode.SINGLE, new NettyStreamFactory(SocketSettings.builder().build(), getSslSettings()), getCredentialWithCache(), CLIENT_METADATA, [], LoggerSettings.builder().build(), null, getServerApi()) diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/CompositeByteBufSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/CompositeByteBufSpecification.groovy deleted file mode 100644 index 6c43e2a667e..00000000000 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/CompositeByteBufSpecification.groovy +++ /dev/null @@ -1,541 +0,0 @@ -/* - * Copyright 2008-present MongoDB, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.mongodb.internal.connection - -import org.bson.ByteBufNIO -import spock.lang.Specification - -import java.nio.ByteBuffer -import java.nio.ByteOrder - -class CompositeByteBufSpecification extends Specification { - - def 'should throw if buffers is null'() { - when: - new CompositeByteBuf(null) - - then: - thrown(IllegalArgumentException) - } - - def 'should throw if buffers is empty'() { - when: - new CompositeByteBuf([]) - - then: - thrown(IllegalArgumentException) - } - - def 'reference count should be maintained'() { - when: - def buf = new CompositeByteBuf([new ByteBufNIO(ByteBuffer.wrap([1, 2, 3, 4] as byte[]))]) - - then: - buf.getReferenceCount() == 1 - - when: - buf.retain() - - then: - buf.getReferenceCount() == 2 - - when: - buf.release() - - then: - buf.getReferenceCount() == 1 - - when: - buf.release() - - then: - buf.getReferenceCount() == 0 - - when: - buf.release() - - then: - thrown(IllegalStateException) - - when: - buf.retain() - - then: - thrown(IllegalStateException) - } - - def 'order should throw if not little endian'() { - when: - new CompositeByteBuf([new ByteBufNIO(ByteBuffer.wrap([1, 2, 3, 4] as byte[]))]).order(ByteOrder.BIG_ENDIAN) - - then: - thrown(UnsupportedOperationException) - } - - def 'order should return normally if little endian'() { - when: - new CompositeByteBuf([new ByteBufNIO(ByteBuffer.wrap([1, 2, 3, 4] as byte[]))]).order(ByteOrder.LITTLE_ENDIAN) - - then: - true - } - - def 'limit should be sum of limits of buffers'() { - expect: - new CompositeByteBuf([new ByteBufNIO(ByteBuffer.wrap([1, 2, 3, 4] as byte[]))]).limit() == 4 - new CompositeByteBuf([new ByteBufNIO(ByteBuffer.wrap([1, 2, 3, 4] as byte[])), - new ByteBufNIO(ByteBuffer.wrap([1, 2] as byte[]))]).limit() == 6 - } - - def 'capacity should be the initial limit'() { - expect: - new CompositeByteBuf([new ByteBufNIO(ByteBuffer.wrap([1, 2, 3, 4] as byte[]))]).capacity() == 4 - new CompositeByteBuf([new ByteBufNIO(ByteBuffer.wrap([1, 2, 3, 4] as byte[])), - new ByteBufNIO(ByteBuffer.wrap([1, 2] as byte[]))]).capacity() == 6 - } - - def 'position should be 0'() { - expect: - new CompositeByteBuf([new ByteBufNIO(ByteBuffer.wrap([1, 2, 3, 4] as byte[]))]).position() == 0 - } - - def 'position should be set if in range'() { - given: - def buf = new CompositeByteBuf([new ByteBufNIO(ByteBuffer.wrap([1, 2, 3] as byte[]))]) - - when: - buf.position(0) - - then: - buf.position() == 0 - - when: - buf.position(1) - - then: - buf.position() == 1 - - when: - buf.position(2) - - then: - buf.position() == 2 - - when: - buf.position(3) - - then: - buf.position() == 3 - } - - def 'position should throw if out of range'() { - given: - def buf = new CompositeByteBuf([new ByteBufNIO(ByteBuffer.wrap([1, 2, 3] as byte[]))]) - - when: - buf.position(-1) - - then: - thrown(IndexOutOfBoundsException) - - when: - buf.position(4) - - then: - thrown(IndexOutOfBoundsException) - - and: - buf.limit(2) - - when: - buf.position(3) - - then: - thrown(IndexOutOfBoundsException) - } - - def 'limit should be set if in range'() { - given: - def buf = new CompositeByteBuf([new ByteBufNIO(ByteBuffer.wrap([1, 2, 3] as byte[]))]) - - when: - buf.limit(0) - - then: - buf.limit() == 0 - - when: - buf.limit(1) - - then: - buf.limit() == 1 - - when: - buf.limit(2) - - then: - buf.limit() == 2 - - when: - buf.limit(3) - - then: - buf.limit() == 3 - } - - def 'limit should throw if out of range'() { - given: - def buf = new CompositeByteBuf([new ByteBufNIO(ByteBuffer.wrap([1, 2, 3] as byte[]))]) - - when: - buf.limit(-1) - - then: - thrown(IndexOutOfBoundsException) - - when: - buf.limit(4) - - then: - thrown(IndexOutOfBoundsException) - } - - def 'clear should reset position and limit'() { - given: - def buf = new CompositeByteBuf([new ByteBufNIO(ByteBuffer.wrap([1, 2, 3] as byte[]))]) - buf.limit(2) - buf.get() - - when: - buf.clear() - - then: - buf.position() == 0 - buf.limit() == 3 - } - - - def 'duplicate should copy all properties'() { - given: - def buf = new CompositeByteBuf([new ByteBufNIO(ByteBuffer.wrap([1, 2, 1, 2, 3, 4, 1, 2] as byte[]))]) - buf.limit(6) - buf.get() - buf.get() - - when: - def duplicate = buf.duplicate() - - then: - duplicate.position() == 2 - duplicate.limit() == 6 - duplicate.getInt() == 67305985 - !duplicate.hasRemaining() - buf.position() == 2 - } - - - def 'position, remaining, and hasRemaining should update as bytes are read'() { - when: - def buf = new CompositeByteBuf([new ByteBufNIO(ByteBuffer.wrap([1, 2, 3, 4] as byte[]))]) - - then: - buf.position() == 0 - buf.remaining() == 4 - buf.hasRemaining() - - when: - buf.get() - - then: - buf.position() == 1 - buf.remaining() == 3 - buf.hasRemaining() - - when: - buf.get() - - then: - buf.position() == 2 - buf.remaining() == 2 - buf.hasRemaining() - - when: - buf.get() - - then: - buf.position() == 3 - buf.remaining() == 1 - buf.hasRemaining() - - when: - buf.get() - - then: - buf.position() == 4 - buf.remaining() == 0 - !buf.hasRemaining() - } - - def 'absolute getInt should read little endian integer and preserve position'() { - given: - def byteBuffer = new ByteBufNIO(ByteBuffer.wrap([1, 2, 3, 4] as byte[])) - def buf = new CompositeByteBuf([byteBuffer]) - - when: - def i = buf.getInt(0) - - then: - i == 67305985 - buf.position() == 0 - byteBuffer.position() == 0 - } - - def 'absolute getInt should read little endian integer when integer is split accross buffers'() { - given: - def byteBufferOne = new ByteBufNIO(ByteBuffer.wrap([1, 2] as byte[])) - def byteBufferTwo = new ByteBufNIO(ByteBuffer.wrap([3, 4] as byte[])) - def buf = new CompositeByteBuf([byteBufferOne, byteBufferTwo]) - - when: - def i = buf.getInt(0) - - then: - i == 67305985 - buf.position() == 0 - byteBufferOne.position() == 0 - byteBufferTwo.position() == 0 - } - - def 'relative getInt should read little endian integer and move position'() { - given: - def byteBuffer = new ByteBufNIO(ByteBuffer.wrap([1, 2, 3, 4] as byte[])) - def buf = new CompositeByteBuf([byteBuffer]) - - when: - def i = buf.getInt() - - then: - i == 67305985 - buf.position() == 4 - byteBuffer.position() == 0 - } - - def 'absolute getLong should read little endian long and preserve position'() { - given: - def byteBuffer = new ByteBufNIO(ByteBuffer.wrap([1, 2, 3, 4, 5, 6, 7, 8] as byte[])) - def buf = new CompositeByteBuf([byteBuffer]) - - when: - def l = buf.getLong(0) - - then: - l == 578437695752307201L - buf.position() == 0 - byteBuffer.position() == 0 - } - - def 'absolute getLong should read little endian long when double is split accross buffers'() { - given: - def byteBufferOne = new ByteBufNIO(ByteBuffer.wrap([1, 2] as byte[])) - def byteBufferTwo = new ByteBufNIO(ByteBuffer.wrap([3, 4] as byte[])) - def byteBufferThree = new ByteBufNIO(ByteBuffer.wrap([5, 6] as byte[])) - def byteBufferFour = new ByteBufNIO(ByteBuffer.wrap([7, 8] as byte[])) - def buf = new CompositeByteBuf([byteBufferOne, byteBufferTwo, byteBufferThree, byteBufferFour]) - - when: - def l = buf.getLong(0) - - then: - l == 578437695752307201L - buf.position() == 0 - byteBufferOne.position() == 0 - byteBufferTwo.position() == 0 - } - - def 'relative getLong should read little endian long and move position'() { - given: - def byteBuffer = new ByteBufNIO(ByteBuffer.wrap([1, 2, 3, 4, 5, 6, 7, 8] as byte[])) - def buf = new CompositeByteBuf([byteBuffer]) - - when: - def l = buf.getLong() - - then: - l == 578437695752307201L - buf.position() == 8 - byteBuffer.position() == 0 - } - - def 'absolute getDouble should read little endian double and preserve position'() { - given: - def byteBuffer = new ByteBufNIO(ByteBuffer.wrap([1, 2, 3, 4, 5, 6, 7, 8] as byte[])) - def buf = new CompositeByteBuf([byteBuffer]) - - when: - def d = buf.getDouble(0) - - then: - d == 5.447603722011605E-270 as double - buf.position() == 0 - byteBuffer.position() == 0 - } - - def 'relative getDouble should read little endian double and move position'() { - given: - def byteBuffer = new ByteBufNIO(ByteBuffer.wrap([1, 2, 3, 4, 5, 6, 7, 8] as byte[])) - def buf = new CompositeByteBuf([byteBuffer]) - - when: - def d = buf.getDouble() - - then: - d == 5.447603722011605E-270 as double - buf.position() == 8 - byteBuffer.position() == 0 - } - - def 'absolute bulk get should read bytes and preserve position'() { - given: - def byteBuffer = new ByteBufNIO(ByteBuffer.wrap([1, 2, 3, 4] as byte[])) - def buf = new CompositeByteBuf([byteBuffer]) - - when: - def bytes = new byte[4] - buf.get(0, bytes) - - then: - bytes == [1, 2, 3, 4] as byte[] - buf.position() == 0 - byteBuffer.position() == 0 - } - - def 'absolute bulk get should read bytes when split across buffers'() { - given: - def byteBufferOne = new ByteBufNIO(ByteBuffer.wrap([1] as byte[])) - def byteBufferTwo = new ByteBufNIO(ByteBuffer.wrap([2, 3] as byte[])) - def byteBufferThree = new ByteBufNIO(ByteBuffer.wrap([4, 5, 6] as byte[])) - def byteBufferFour = new ByteBufNIO(ByteBuffer.wrap([7, 8, 9, 10] as byte[])) - def byteBufferFive = new ByteBufNIO(ByteBuffer.wrap([11] as byte[])) - def byteBufferSix = new ByteBufNIO(ByteBuffer.wrap([12] as byte[])) - def buf = new CompositeByteBuf([byteBufferOne, byteBufferTwo, byteBufferThree, byteBufferFour, - byteBufferFive, byteBufferSix]) - - when: - def bytes = new byte[16] - buf.get(2, bytes, 4, 9) - - then: - bytes == [0, 0, 0, 0, 3, 4, 5, 6, 7, 8, 9, 10, 11, 0, 0, 0] as byte[] - buf.position() == 0 - } - - def 'relative bulk get should read bytes and move position'() { - given: - def byteBuffer = new ByteBufNIO(ByteBuffer.wrap([1, 2, 3, 4, 5, 6, 7, 8] as byte[])) - def buf = new CompositeByteBuf([byteBuffer]) - - when: - def bytes = new byte[4] - buf.get(bytes) - - then: - bytes == [1, 2, 3, 4] as byte[] - buf.position() == 4 - byteBuffer.position() == 0 - - when: - bytes = new byte[8] - buf.get(bytes, 4, 3) - - then: - bytes == [0, 0, 0, 0, 5, 6, 7, 0] as byte[] - buf.position() == 7 - byteBuffer.position() == 0 - } - - def 'should get as NIO ByteBuffer'() { - given: - def buf = new CompositeByteBuf([new ByteBufNIO(ByteBuffer.wrap([1, 2, 3, 4, 5, 6, 7, 8] as byte[]))]) - - when: - buf.position(1).limit(5) - def nio = buf.asNIO() - - then: - nio.position() == 1 - nio.limit(5) - def bytes = new byte[4] - nio.get(bytes) - bytes == [2, 3, 4, 5] as byte[] - } - - def 'should get as NIO ByteBuffer with multiple buffers'() { - given: - def buf = new CompositeByteBuf([new ByteBufNIO(ByteBuffer.wrap([1, 2] as byte[])), - new ByteBufNIO(ByteBuffer.wrap([3, 4, 5] as byte[])), - new ByteBufNIO(ByteBuffer.wrap([6, 7, 8, 9] as byte[]))]) - - when: - buf.position(1).limit(6) - def nio = buf.asNIO() - - then: - nio.position() == 0 - nio.limit(5) - def bytes = new byte[5] - nio.get(bytes) - bytes == [2, 3, 4, 5, 6] as byte[] - } - - def 'should throw IndexOutOfBoundsException if reading out of bounds'() { - given: - def buf = new CompositeByteBuf([new ByteBufNIO(ByteBuffer.wrap([1, 2, 3, 4] as byte[]))]) - buf.position(4) - - when: - buf.get() - - then: - thrown(IndexOutOfBoundsException) - - when: - buf.position(1) - buf.getInt() - - then: - thrown(IndexOutOfBoundsException) - - when: - buf.position(0) - buf.get(new byte[2], 1, 2) - - then: - thrown(IndexOutOfBoundsException) - } - - def 'should throw IllegalStateException if buffer is closed'() { - given: - def buf = new CompositeByteBuf([new ByteBufNIO(ByteBuffer.wrap([1, 2, 3, 4] as byte[]))]) - buf.release() - - when: - buf.get() - - then: - thrown(IllegalStateException) - } -} diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/CompositeByteBufTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/CompositeByteBufTest.java new file mode 100644 index 00000000000..8e2d0dfe94c --- /dev/null +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/CompositeByteBufTest.java @@ -0,0 +1,427 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.mongodb.internal.connection; + +import com.mongodb.internal.connection.netty.NettyByteBuf; +import org.bson.ByteBuf; +import org.bson.ByteBufNIO; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Named; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.List; +import java.util.stream.Stream; + +import static io.netty.buffer.Unpooled.copiedBuffer; +import static io.netty.buffer.Unpooled.wrappedBuffer; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +public final class CompositeByteBufTest { + + @Test + @SuppressWarnings("ConstantConditions") + void shouldThrowIfBuffersIsNull() { + assertThrows(IllegalArgumentException.class, () -> new CompositeByteBuf((List) null)); + } + + @Test + void shouldThrowIfBuffersIsEmpty() { + assertThrows(IllegalArgumentException.class, () -> new CompositeByteBuf(emptyList())); + } + + @DisplayName("referenceCount should be maintained") + @ParameterizedTest + @MethodSource("getBuffers") + void referenceCountShouldBeMaintained(final List buffers) { + CompositeByteBuf buf = new CompositeByteBuf(buffers); + assertEquals(1, buf.getReferenceCount()); + + buf.retain(); + assertEquals(2, buf.getReferenceCount()); + + buf.release(); + assertEquals(1, buf.getReferenceCount()); + + buf.release(); + assertEquals(0, buf.getReferenceCount()); + + assertThrows(IllegalStateException.class, buf::release); + assertThrows(IllegalStateException.class, buf::retain); + } + + private static Stream getBuffers() { + return Stream.of( + Arguments.of(Named.of("ByteBufNIO", + asList(new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3, 4})), + new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3, 4}))))), + Arguments.of(Named.of("NettyByteBuf", + asList(new NettyByteBuf(copiedBuffer(new byte[]{1, 2, 3, 4})), + new NettyByteBuf(wrappedBuffer(new byte[]{1, 2, 3, 4}))))), + Arguments.of(Named.of("Mixed NIO and NettyByteBuf", + asList(new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3, 4})), + new NettyByteBuf(wrappedBuffer(new byte[]{1, 2, 3, 4}))))) + ); + } + + @Test + void orderShouldThrowIfNotLittleEndian() { + CompositeByteBuf buf = new CompositeByteBuf(singletonList(new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3, 4})))); + assertThrows(UnsupportedOperationException.class, () -> buf.order(ByteOrder.BIG_ENDIAN)); + } + + @Test + void orderShouldReturnNormallyIfLittleEndian() { + CompositeByteBuf buf = new CompositeByteBuf(singletonList(new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3, 4})))); + assertDoesNotThrow(() -> buf.order(ByteOrder.LITTLE_ENDIAN)); + } + + @Test + void limitShouldBeSumOfLimitsOfBuffers() { + assertEquals(4, new CompositeByteBuf(singletonList(new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3, 4})))).limit()); + + assertEquals(6, new CompositeByteBuf(asList( + new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3, 4})), + new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2})) + )).limit()); + } + + @Test + void capacityShouldBeTheInitialLimit() { + assertEquals(4, new CompositeByteBuf(singletonList(new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3, 4})))).capacity()); + assertEquals(6, new CompositeByteBuf(asList( + new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3, 4})), + new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2})) + )).capacity()); + } + + @Test + void positionShouldBeZero() { + assertEquals(0, new CompositeByteBuf(singletonList(new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3, 4})))).position()); + } + + @Test + void positionShouldBeSetIfInRange() { + CompositeByteBuf buf = new CompositeByteBuf(singletonList(new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3})))); + buf.position(0); + assertEquals(0, buf.position()); + + buf.position(1); + assertEquals(1, buf.position()); + + buf.position(2); + assertEquals(2, buf.position()); + + buf.position(3); + assertEquals(3, buf.position()); + } + + @Test + void positionShouldThrowIfOutOfRange() { + CompositeByteBuf buf = new CompositeByteBuf(singletonList(new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3})))); + + assertThrows(IndexOutOfBoundsException.class, () -> buf.position(-1)); + assertThrows(IndexOutOfBoundsException.class, () -> buf.position(4)); + + buf.limit(2); + assertThrows(IndexOutOfBoundsException.class, () -> buf.position(3)); + } + + @Test + void limitShouldBeSetIfInRange() { + CompositeByteBuf buf = new CompositeByteBuf(singletonList(new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3})))); + buf.limit(0); + assertEquals(0, buf.limit()); + + buf.limit(1); + assertEquals(1, buf.limit()); + + buf.limit(2); + assertEquals(2, buf.limit()); + + buf.limit(3); + assertEquals(3, buf.limit()); + } + + @Test + void limitShouldThrowIfOutOfRange() { + CompositeByteBuf buf = new CompositeByteBuf(singletonList(new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3})))); + + assertThrows(IndexOutOfBoundsException.class, () -> buf.limit(-1)); + assertThrows(IndexOutOfBoundsException.class, () -> buf.limit(4)); + } + + @Test + void clearShouldResetPositionAndLimit() { + CompositeByteBuf buf = new CompositeByteBuf(singletonList(new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3})))); + buf.limit(2); + buf.get(); + buf.clear(); + + assertEquals(0, buf.position()); + assertEquals(3, buf.limit()); + } + + @Test + void duplicateShouldCopyAllProperties() { + CompositeByteBuf buf = new CompositeByteBuf(singletonList(new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 1, 2, 3, 4, 1, 2})))); + buf.limit(6); + buf.get(); + buf.get(); + CompositeByteBuf duplicate = (CompositeByteBuf) buf.duplicate(); + + assertEquals(2, duplicate.position()); + assertEquals(6, duplicate.limit()); + assertEquals(67305985, duplicate.getInt()); + assertFalse(duplicate.hasRemaining()); + assertEquals(2, buf.position()); + } + + @Test + void positionRemainingAndHasRemainingShouldUpdateAsBytesAreRead() { + CompositeByteBuf buf = new CompositeByteBuf(singletonList(new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3, 4})))); + assertEquals(0, buf.position()); + assertEquals(4, buf.remaining()); + assertTrue(buf.hasRemaining()); + + buf.get(); + assertEquals(1, buf.position()); + assertEquals(3, buf.remaining()); + assertTrue(buf.hasRemaining()); + + buf.get(); + assertEquals(2, buf.position()); + assertEquals(2, buf.remaining()); + assertTrue(buf.hasRemaining()); + + buf.get(); + assertEquals(3, buf.position()); + assertEquals(1, buf.remaining()); + assertTrue(buf.hasRemaining()); + + buf.get(); + assertEquals(4, buf.position()); + assertEquals(0, buf.remaining()); + assertFalse(buf.hasRemaining()); + } + + @Test + void absoluteGetIntShouldReadLittleEndianIntegerAndPreservePosition() { + ByteBufNIO byteBuffer = new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3, 4})); + CompositeByteBuf buf = new CompositeByteBuf(singletonList(byteBuffer)); + int i = buf.getInt(0); + + assertEquals(67305985, i); + assertEquals(0, buf.position()); + assertEquals(0, byteBuffer.position()); + } + + @Test + void absoluteGetIntShouldReadLittleEndianIntegerWhenIntegerIsSplitAcrossBuffers() { + ByteBufNIO byteBufferOne = new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2})); + ByteBuf byteBufferTwo = new NettyByteBuf(wrappedBuffer(new byte[]{3, 4})).flip(); + CompositeByteBuf buf = new CompositeByteBuf(asList(byteBufferOne, byteBufferTwo)); + int i = buf.getInt(0); + + assertEquals(67305985, i); + assertEquals(0, buf.position()); + assertEquals(0, byteBufferOne.position()); + assertEquals(0, byteBufferTwo.position()); + } + + @Test + void relativeGetIntShouldReadLittleEndianIntegerAndMovePosition() { + ByteBufNIO byteBuffer = new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3, 4})); + CompositeByteBuf buf = new CompositeByteBuf(singletonList(byteBuffer)); + int i = buf.getInt(); + assertEquals(67305985, i); + assertEquals(4, buf.position()); + assertEquals(0, byteBuffer.position()); + } + + @Test + void absoluteGetLongShouldReadLittleEndianLongAndPreservePosition() { + ByteBufNIO byteBuffer = new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5, 6, 7, 8})); + CompositeByteBuf buf = new CompositeByteBuf(singletonList(byteBuffer)); + long l = buf.getLong(0); + + assertEquals(578437695752307201L, l); + assertEquals(0, buf.position()); + assertEquals(0, byteBuffer.position()); + } + + @Test + void absoluteGetLongShouldReadLittleEndianLongWhenDoubleIsSplitAcrossBuffers() { + ByteBuf byteBufferOne = new NettyByteBuf(wrappedBuffer(new byte[]{1, 2})).flip(); + ByteBuf byteBufferTwo = new ByteBufNIO(ByteBuffer.wrap(new byte[]{3, 4})); + ByteBuf byteBufferThree = new NettyByteBuf(wrappedBuffer(new byte[]{5, 6})).flip(); + ByteBuf byteBufferFour = new ByteBufNIO(ByteBuffer.wrap(new byte[]{7, 8})); + CompositeByteBuf buf = new CompositeByteBuf(asList(byteBufferOne, byteBufferTwo, byteBufferThree, byteBufferFour)); + long l = buf.getLong(0); + + assertEquals(578437695752307201L, l); + assertEquals(0, buf.position()); + assertEquals(0, byteBufferOne.position()); + assertEquals(0, byteBufferTwo.position()); + } + + @Test + void relativeGetLongShouldReadLittleEndianLongAndMovePosition() { + ByteBufNIO byteBuffer = new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5, 6, 7, 8})); + CompositeByteBuf buf = new CompositeByteBuf(singletonList(byteBuffer)); + long l = buf.getLong(); + + assertEquals(578437695752307201L, l); + assertEquals(8, buf.position()); + assertEquals(0, byteBuffer.position()); + } + + @Test + void absoluteGetDoubleShouldReadLittleEndianDoubleAndPreservePosition() { + ByteBufNIO byteBuffer = new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5, 6, 7, 8})); + CompositeByteBuf buf = new CompositeByteBuf(singletonList(byteBuffer)); + double d = buf.getDouble(0); + + assertEquals(5.447603722011605E-270, d); + assertEquals(0, buf.position()); + assertEquals(0, byteBuffer.position()); + } + + @Test + void relativeGetDoubleShouldReadLittleEndianDoubleAndMovePosition() { + ByteBufNIO byteBuffer = new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5, 6, 7, 8})); + CompositeByteBuf buf = new CompositeByteBuf(singletonList(byteBuffer)); + double d = buf.getDouble(); + + assertEquals(5.447603722011605E-270, d); + assertEquals(8, buf.position()); + assertEquals(0, byteBuffer.position()); + } + + @Test + void absoluteBulkGetShouldReadBytesAndPreservePosition() { + ByteBufNIO byteBuffer = new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3, 4})); + CompositeByteBuf buf = new CompositeByteBuf(singletonList(byteBuffer)); + byte[] bytes = new byte[4]; + buf.get(0, bytes); + + assertArrayEquals(new byte[]{1, 2, 3, 4}, bytes); + assertEquals(0, buf.position()); + assertEquals(0, byteBuffer.position()); + } + + @Test + void absoluteBulkGetShouldReadBytesWhenSplitAcrossBuffers() { + ByteBufNIO byteBufferOne = new ByteBufNIO(ByteBuffer.wrap(new byte[]{1})); + ByteBufNIO byteBufferTwo = new ByteBufNIO(ByteBuffer.wrap(new byte[]{2, 3})); + ByteBufNIO byteBufferThree = new ByteBufNIO(ByteBuffer.wrap(new byte[]{4, 5, 6})); + ByteBufNIO byteBufferFour = new ByteBufNIO(ByteBuffer.wrap(new byte[]{7, 8, 9, 10})); + ByteBufNIO byteBufferFive = new ByteBufNIO(ByteBuffer.wrap(new byte[]{11})); + ByteBufNIO byteBufferSix = new ByteBufNIO(ByteBuffer.wrap(new byte[]{12})); + CompositeByteBuf buf = new CompositeByteBuf(asList( + byteBufferOne, byteBufferTwo, byteBufferThree, byteBufferFour, byteBufferFive, byteBufferSix)); + + byte[] bytes = new byte[16]; + buf.get(2, bytes, 4, 9); + assertArrayEquals(new byte[]{0, 0, 0, 0, 3, 4, 5, 6, 7, 8, 9, 10, 11, 0, 0, 0}, bytes); + assertEquals(0, buf.position()); + } + + @Test + void relativeBulkGetShouldReadBytesAndMovePosition() { + ByteBufNIO byteBuffer = new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5, 6, 7, 8})); + CompositeByteBuf buf = new CompositeByteBuf(singletonList(byteBuffer)); + byte[] bytes = new byte[4]; + buf.get(bytes); + + assertArrayEquals(new byte[]{1, 2, 3, 4}, bytes); + assertEquals(4, buf.position()); + assertEquals(0, byteBuffer.position()); + + bytes = new byte[8]; + buf.get(bytes, 4, 3); + assertArrayEquals(new byte[]{0, 0, 0, 0, 5, 6, 7, 0}, bytes); + assertEquals(7, buf.position()); + assertEquals(0, byteBuffer.position()); + } + + @Test + void shouldGetAsNIOByteBuffer() { + CompositeByteBuf buf = new CompositeByteBuf(singletonList(new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5, 6, 7, 8})))); + buf.position(1).limit(5); + ByteBuffer nio = buf.asNIO(); + + assertEquals(1, nio.position()); + assertEquals(5, nio.limit()); + + byte[] bytes = new byte[4]; + nio.get(bytes); + assertArrayEquals(new byte[]{2, 3, 4, 5}, bytes); + } + + @Test + void shouldGetAsNIOByteBufferWithMultipleBuffers() { + CompositeByteBuf buf = new CompositeByteBuf(asList( + new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2})), + new NettyByteBuf(wrappedBuffer(new byte[]{3, 4, 5})).flip(), + new ByteBufNIO(ByteBuffer.wrap(new byte[]{6, 7, 8, 9})) + )); + buf.position(1).limit(6); + ByteBuffer nio = buf.asNIO(); + + assertEquals(0, nio.position()); + assertEquals(5, nio.limit()); + + byte[] bytes = new byte[5]; + nio.get(bytes); + assertArrayEquals(new byte[]{2, 3, 4, 5, 6}, bytes); + } + + @Test + void shouldThrowIndexOutOfBoundsExceptionIfReadingOutOfBounds() { + CompositeByteBuf buf = new CompositeByteBuf(singletonList(new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3, 4})))); + buf.position(4); + + assertThrows(IndexOutOfBoundsException.class, buf::get); + buf.position(1); + + assertThrows(IndexOutOfBoundsException.class, buf::getInt); + buf.position(0); + + assertThrows(IndexOutOfBoundsException.class, () -> buf.get(new byte[2], 1, 2)); + } + + @Test + void shouldThrowIllegalStateExceptionIfBufferIsClosed() { + CompositeByteBuf buf = new CompositeByteBuf(singletonList(new ByteBufNIO(ByteBuffer.wrap(new byte[]{1, 2, 3, 4})))); + buf.release(); + + assertThrows(IllegalStateException.class, buf::get); + } +}