Skip to content

Commit d3361a9

Browse files
committed
Updated CompositeByteBuf to be responsible for retaining its ByteBufs
* Updated NettyByteBuf so that it does its own reference counting as the internals of Netty can also retain and release the netty ByteBuf implementation. * Updated CommandMessage as CompositeByteBuf handles the releasing of its ByteBufs * Migrated CompositeByteBufSpecification to JUnit 5 and added extra test cases with mixed ByteBuf types not just NIO ones. * Added recording to CommandHelperSpecification as this caught the initial use of Netty doing its own accounting on its ByteBuf implementations. JAVA-5982
1 parent 9b98ab5 commit d3361a9

File tree

6 files changed

+483
-581
lines changed

6 files changed

+483
-581
lines changed

driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java

Lines changed: 32 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -148,48 +148,43 @@ public final class CommandMessage extends RequestMessage {
148148
* `PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE` sections.
149149
*/
150150
BsonDocument getCommandDocument(final ByteBufferBsonOutput bsonOutput) {
151-
List<ByteBuf> byteBuffers = bsonOutput.getByteBuffers();
151+
CompositeByteBuf byteBuf = new CompositeByteBuf(bsonOutput.getByteBuffers());
152152
try {
153-
CompositeByteBuf byteBuf = new CompositeByteBuf(byteBuffers);
154-
try {
155-
byteBuf.position(firstDocumentPosition);
156-
ByteBufBsonDocument byteBufBsonDocument = createOne(byteBuf);
157-
158-
// If true, it means there is at least one `PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE` section in the OP_MSG
159-
if (byteBuf.hasRemaining()) {
160-
BsonDocument commandBsonDocument = byteBufBsonDocument.toBaseBsonDocument();
161-
162-
// Each loop iteration processes one Document Sequence
163-
// When there are no more bytes remaining, there are no more Document Sequences
164-
while (byteBuf.hasRemaining()) {
165-
// skip reading the payload type, we know it is `PAYLOAD_TYPE_1`
166-
byteBuf.position(byteBuf.position() + 1);
167-
int sequenceStart = byteBuf.position();
168-
int sequenceSizeInBytes = byteBuf.getInt();
169-
int sectionEnd = sequenceStart + sequenceSizeInBytes;
170-
171-
String fieldName = getSequenceIdentifier(byteBuf);
172-
// If this assertion fires, it means that the driver has started using document sequences for nested fields. If
173-
// so, this method will need to change in order to append the value to the correct nested document.
174-
assertFalse(fieldName.contains("."));
175-
176-
ByteBuf documentsByteBufSlice = byteBuf.duplicate().limit(sectionEnd);
177-
try {
178-
commandBsonDocument.append(fieldName, new BsonArray(createList(documentsByteBufSlice)));
179-
} finally {
180-
documentsByteBufSlice.release();
181-
}
182-
byteBuf.position(sectionEnd);
153+
byteBuf.position(firstDocumentPosition);
154+
ByteBufBsonDocument byteBufBsonDocument = createOne(byteBuf);
155+
156+
// If true, it means there is at least one `PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE` section in the OP_MSG
157+
if (byteBuf.hasRemaining()) {
158+
BsonDocument commandBsonDocument = byteBufBsonDocument.toBaseBsonDocument();
159+
160+
// Each loop iteration processes one Document Sequence
161+
// When there are no more bytes remaining, there are no more Document Sequences
162+
while (byteBuf.hasRemaining()) {
163+
// skip reading the payload type, we know it is `PAYLOAD_TYPE_1`
164+
byteBuf.position(byteBuf.position() + 1);
165+
int sequenceStart = byteBuf.position();
166+
int sequenceSizeInBytes = byteBuf.getInt();
167+
int sectionEnd = sequenceStart + sequenceSizeInBytes;
168+
169+
String fieldName = getSequenceIdentifier(byteBuf);
170+
// If this assertion fires, it means that the driver has started using document sequences for nested fields. If
171+
// so, this method will need to change in order to append the value to the correct nested document.
172+
assertFalse(fieldName.contains("."));
173+
174+
ByteBuf documentsByteBufSlice = byteBuf.duplicate().limit(sectionEnd);
175+
try {
176+
commandBsonDocument.append(fieldName, new BsonArray(createList(documentsByteBufSlice)));
177+
} finally {
178+
documentsByteBufSlice.release();
183179
}
184-
return commandBsonDocument;
185-
} else {
186-
return byteBufBsonDocument;
180+
byteBuf.position(sectionEnd);
187181
}
188-
} finally {
189-
byteBuf.release();
182+
return commandBsonDocument;
183+
} else {
184+
return byteBufBsonDocument;
190185
}
191186
} finally {
192-
byteBuffers.forEach(ByteBuf::release);
187+
byteBuf.release();
193188
}
194189
}
195190

driver-core/src/main/com/mongodb/internal/connection/CompositeByteBuf.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.mongodb.internal.connection;
1818

19+
import com.mongodb.assertions.Assertions;
1920
import org.bson.ByteBuf;
2021

2122
import java.nio.Buffer;
@@ -50,7 +51,11 @@ class CompositeByteBuf implements ByteBuf {
5051
}
5152

5253
CompositeByteBuf(final CompositeByteBuf from) {
53-
components = from.components;
54+
notNull("from", from);
55+
components = new ArrayList<>(from.components.size());
56+
from.components.forEach(component ->
57+
components.add(new Component(component.buffer.duplicate(), component.offset))
58+
);
5459
position = from.position();
5560
limit = from.limit();
5661
}
@@ -306,6 +311,7 @@ public ByteBuf retain() {
306311
referenceCount.decrementAndGet();
307312
throw new IllegalStateException("Attempted to increment the reference count when it is already 0");
308313
}
314+
components.forEach(c -> c.buffer.retain());
309315
return this;
310316
}
311317

@@ -315,6 +321,11 @@ public void release() {
315321
referenceCount.incrementAndGet();
316322
throw new IllegalStateException("Attempted to decrement the reference count below 0");
317323
}
324+
components.forEach(c -> c.buffer.release());
325+
if (referenceCount.get() == 0) {
326+
Assertions.assertTrue(components.stream().noneMatch(c -> c.buffer.getReferenceCount() > 0),
327+
"Some buffers still had a reference to them even though the CompositeByteBuf was fully released");
328+
}
318329
}
319330

320331
private Component findComponent(final int index) {

driver-core/src/main/com/mongodb/internal/connection/netty/NettyByteBuf.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@
2020

2121
import java.nio.ByteBuffer;
2222
import java.nio.ByteOrder;
23+
import java.util.concurrent.atomic.AtomicInteger;
2324

2425
/**
2526
* <p>This class is not part of the public API and may be removed or changed at any time</p>
2627
*/
2728
public final class NettyByteBuf implements ByteBuf {
28-
29+
private final AtomicInteger referenceCount = new AtomicInteger(1);
2930
private io.netty.buffer.ByteBuf proxied;
3031
private boolean isWriting = true;
3132

@@ -271,17 +272,25 @@ public ByteBuffer asNIO() {
271272

272273
@Override
273274
public int getReferenceCount() {
274-
return proxied.refCnt();
275+
return referenceCount.get();
275276
}
276277

277278
@Override
278279
public ByteBuf retain() {
280+
if (referenceCount.incrementAndGet() == 1) {
281+
referenceCount.decrementAndGet();
282+
throw new IllegalStateException("Attempted to increment the reference count when it is already 0");
283+
}
279284
proxied.retain();
280285
return this;
281286
}
282287

283288
@Override
284289
public void release() {
290+
if (referenceCount.decrementAndGet() < 0) {
291+
referenceCount.incrementAndGet();
292+
throw new IllegalStateException("Attempted to decrement the reference count below 0");
293+
}
285294
proxied.release();
286295
}
287296
}

driver-core/src/test/functional/com/mongodb/internal/connection/CommandHelperSpecification.groovy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class CommandHelperSpecification extends Specification {
4343
InternalConnection connection
4444

4545
def setup() {
46+
InternalStreamConnection.setRecordEverything(true) // Ensures implementation can log as expected
4647
connection = new InternalStreamConnectionFactory(ClusterConnectionMode.SINGLE,
4748
new NettyStreamFactory(SocketSettings.builder().build(), getSslSettings()),
4849
getCredentialWithCache(), CLIENT_METADATA, [], LoggerSettings.builder().build(), null, getServerApi())

0 commit comments

Comments
 (0)