2424import com .mongodb .MongoSocketOpenException ;
2525import com .mongodb .MongoSocketReadTimeoutException ;
2626import com .mongodb .ServerAddress ;
27+ import com .mongodb .annotations .ThreadSafe ;
2728import com .mongodb .connection .AsyncCompletionHandler ;
2829import com .mongodb .connection .SocketSettings ;
2930import com .mongodb .connection .SslSettings ;
3031import com .mongodb .connection .Stream ;
32+ import com .mongodb .lang .Nullable ;
3133import io .netty .bootstrap .Bootstrap ;
3234import io .netty .buffer .ByteBufAllocator ;
3335import io .netty .buffer .CompositeByteBuf ;
3436import io .netty .buffer .PooledByteBufAllocator ;
3537import io .netty .channel .Channel ;
3638import io .netty .channel .ChannelFuture ;
3739import io .netty .channel .ChannelFutureListener ;
38- import io .netty .channel .ChannelHandler ;
3940import io .netty .channel .ChannelHandlerContext ;
41+ import io .netty .channel .ChannelInboundHandlerAdapter ;
4042import io .netty .channel .ChannelInitializer ;
4143import io .netty .channel .ChannelOption ;
44+ import io .netty .channel .ChannelPipeline ;
4245import io .netty .channel .EventLoopGroup ;
4346import io .netty .channel .SimpleChannelInboundHandler ;
4447import io .netty .channel .socket .SocketChannel ;
4548import io .netty .handler .ssl .SslHandler ;
4649import io .netty .handler .timeout .ReadTimeoutException ;
47- import io .netty .util .concurrent .EventExecutor ;
4850import org .bson .ByteBuf ;
4951
5052import javax .net .ssl .SSLContext ;
5860import java .util .List ;
5961import java .util .Queue ;
6062import java .util .concurrent .CountDownLatch ;
63+ import java .util .concurrent .Future ;
64+ import java .util .concurrent .ScheduledFuture ;
6165
66+ import static com .mongodb .assertions .Assertions .isTrueArgument ;
6267import static com .mongodb .internal .connection .SslHelper .enableHostNameVerification ;
6368import static com .mongodb .internal .connection .SslHelper .enableSni ;
6469import static java .util .concurrent .TimeUnit .MILLISECONDS ;
6570
6671/**
6772 * A Stream implementation based on Netty 4.0.
73+ * Just like it is for the {@link java.nio.channels.AsynchronousSocketChannel},
74+ * concurrent pending<sup>1</sup> readers
75+ * (whether {@linkplain #read(int, int) synchronous} or {@linkplain #readAsync(int, AsyncCompletionHandler) asynchronous})
76+ * are not supported by {@link NettyStream}.
77+ * However, this class does not have a fail-fast mechanism checking for such situations.
78+ * <hr>
79+ * <sup>1</sup>We cannot simply say that read methods are not allowed be run concurrently because strictly speaking they are allowed,
80+ * as explained below.
81+ * <pre>{@code
82+ * NettyStream stream = ...;
83+ * stream.readAsync(1, new AsyncCompletionHandler<ByteBuf>() {//inv1
84+ * @Override
85+ * public void completed(ByteBuf o) {
86+ * stream.readAsync(//inv2
87+ * 1, ...);//ret2
88+ * }
89+ *
90+ * @Override
91+ * public void failed(Throwable t) {
92+ * }
93+ * });//ret1
94+ * }</pre>
95+ * Arrows on the diagram below represent happens-before relations.
96+ * <pre>{@code
97+ * int1 -> inv2 -> ret2
98+ * \--------> ret1
99+ * }</pre>
100+ * As shown on the diagram, the method {@link #readAsync(int, AsyncCompletionHandler)} runs concurrently with
101+ * itself in the example above. However, there are no concurrent pending readers because the second operation
102+ * is invoked after the first operation has completed reading despite the method has not returned yet.
68103 */
69104final class NettyStream implements Stream {
70- private static final String READ_HANDLER_NAME = "ReadTimeoutHandler" ;
105+ private static final byte NO_SCHEDULE_TIME = 0 ;
71106 private final ServerAddress address ;
72107 private final SocketSettings settings ;
73108 private final SslSettings sslSettings ;
74109 private final EventLoopGroup workerGroup ;
75110 private final Class <? extends SocketChannel > socketChannelClass ;
76111 private final ByteBufAllocator allocator ;
77112
78- private volatile boolean isClosed ;
113+ private boolean isClosed ;
79114 private volatile Channel channel ;
80115
81116 private final LinkedList <io .netty .buffer .ByteBuf > pendingInboundBuffers = new LinkedList <io .netty .buffer .ByteBuf >();
82- private volatile PendingReader pendingReader ;
83- private volatile Throwable pendingException ;
117+ /* The fields pendingReader, pendingException are always written/read inside synchronized blocks
118+ * that use the same NettyStream object, so they can be plain.*/
119+ private PendingReader pendingReader ;
120+ private Throwable pendingException ;
121+ /* The fields readTimeoutTask, readTimeoutMillis are each written only in the ChannelInitializer.initChannel method
122+ * (in addition to the write of the default value and the write by variable initializers),
123+ * and read only when NettyStream users read data, or Netty event loop handles incoming data.
124+ * Since actions done by the ChannelInitializer.initChannel method
125+ * are ordered (in the happens-before order) before user read actions and before event loop actions that handle incoming data,
126+ * these fields can be plain.*/
127+ @ Nullable
128+ private ReadTimeoutTask readTimeoutTask ;
129+ private long readTimeoutMillis = NO_SCHEDULE_TIME ;
84130
85131 NettyStream (final ServerAddress address , final SocketSettings settings , final SslSettings sslSettings , final EventLoopGroup workerGroup ,
86132 final Class <? extends SocketChannel > socketChannelClass , final ByteBufAllocator allocator ) {
@@ -143,6 +189,7 @@ private void initializeChannel(final AsyncCompletionHandler<Void> handler, final
143189 bootstrap .handler (new ChannelInitializer <SocketChannel >() {
144190 @ Override
145191 public void initChannel (final SocketChannel ch ) {
192+ ChannelPipeline pipeline = ch .pipeline ();
146193 if (sslSettings .isEnabled ()) {
147194 SSLEngine engine = getSslContext ().createSSLEngine (address .getHost (), address .getPort ());
148195 engine .setUseClientMode (true );
@@ -152,13 +199,20 @@ public void initChannel(final SocketChannel ch) {
152199 enableHostNameVerification (sslParameters );
153200 }
154201 engine .setSSLParameters (sslParameters );
155- ch . pipeline () .addFirst ("ssl" , new SslHandler (engine , false ));
202+ pipeline .addFirst ("ssl" , new SslHandler (engine , false ));
156203 }
204+
157205 int readTimeout = settings .getReadTimeout (MILLISECONDS );
158- if (readTimeout > 0 ) {
159- ch .pipeline ().addLast (READ_HANDLER_NAME , new ReadTimeoutHandler (readTimeout ));
206+ if (readTimeout > NO_SCHEDULE_TIME ) {
207+ readTimeoutMillis = readTimeout ;
208+ /* We need at least one handler before (in the inbound evaluation order) the InboundBufferHandler,
209+ * so that we can fire exception events (they are inbound events) using its context and the InboundBufferHandler
210+ * receives them. SslHandler is not always present, so adding a NOOP handler.*/
211+ pipeline .addLast (new ChannelInboundHandlerAdapter ());
212+ readTimeoutTask = new ReadTimeoutTask (pipeline .lastContext ());
160213 }
161- ch .pipeline ().addLast (new InboundBufferHandler ());
214+
215+ pipeline .addLast (new InboundBufferHandler ());
162216 }
163217 });
164218 final ChannelFuture channelFuture = bootstrap .connect (nextAddress );
@@ -184,9 +238,10 @@ public boolean supportsAdditionalTimeout() {
184238 }
185239
186240 @ Override
187- public ByteBuf read (final int numBytes , final int additionalTimeout ) throws IOException {
241+ public ByteBuf read (final int numBytes , final int additionalTimeoutMillis ) throws IOException {
242+ isTrueArgument ("additionalTimeoutMillis must not be negative" , additionalTimeoutMillis >= 0 );
188243 FutureAsyncCompletionHandler <ByteBuf > future = new FutureAsyncCompletionHandler <ByteBuf >();
189- readAsync (numBytes , future , additionalTimeout );
244+ readAsync (numBytes , future , combinedTimeout ( readTimeoutMillis , additionalTimeoutMillis ) );
190245 return future .get ();
191246 }
192247
@@ -211,18 +266,27 @@ public void operationComplete(final ChannelFuture future) throws Exception {
211266
212267 @ Override
213268 public void readAsync (final int numBytes , final AsyncCompletionHandler <ByteBuf > handler ) {
214- readAsync (numBytes , handler , 0 );
269+ readAsync (numBytes , handler , readTimeoutMillis );
215270 }
216271
217- private void readAsync (final int numBytes , final AsyncCompletionHandler <ByteBuf > handler , final int additionalTimeout ) {
218- scheduleReadTimeout (additionalTimeout );
272+ /**
273+ * @param numBytes Must be equal to {@link #pendingReader}{@code .numBytes} when called by a Netty channel handler.
274+ * @param handler Must be equal to {@link #pendingReader}{@code .handler} when called by a Netty channel handler.
275+ * @param readTimeoutMillis Must be equal to {@link #NO_SCHEDULE_TIME} when called by a Netty channel handler.
276+ * Timeouts may be scheduled only by the public read methods. Taking into account that concurrent pending
277+ * readers are not allowed, there must not be a situation when threads attempt to schedule a timeout
278+ * before the previous one is either cancelled or completed.
279+ */
280+ private void readAsync (final int numBytes , final AsyncCompletionHandler <ByteBuf > handler , final long readTimeoutMillis ) {
219281 ByteBuf buffer = null ;
220282 Throwable exceptionResult = null ;
221283 synchronized (this ) {
222284 exceptionResult = pendingException ;
223285 if (exceptionResult == null ) {
224286 if (!hasBytesAvailable (numBytes )) {
225- pendingReader = new PendingReader (numBytes , handler );
287+ if (pendingReader == null ) {//called by a public read method
288+ pendingReader = new PendingReader (numBytes , handler , scheduleReadTimeout (readTimeoutTask , readTimeoutMillis ));
289+ }
226290 } else {
227291 CompositeByteBuf composite = allocator .compositeBuffer (pendingInboundBuffers .size ());
228292 int bytesNeeded = numBytes ;
@@ -245,13 +309,16 @@ private void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf>
245309 buffer = new NettyByteBuf (composite ).flip ();
246310 }
247311 }
312+ if (!(exceptionResult == null && buffer == null )//the read operation has completed
313+ && pendingReader != null ) {//we need to clear the pending reader
314+ cancel (pendingReader .timeout );
315+ this .pendingReader = null ;
316+ }
248317 }
249318 if (exceptionResult != null ) {
250- disableReadTimeout ();
251319 handler .failed (exceptionResult );
252320 }
253321 if (buffer != null ) {
254- disableReadTimeout ();
255322 handler .completed (buffer );
256323 }
257324 }
@@ -275,14 +342,12 @@ private void handleReadResponse(final io.netty.buffer.ByteBuf buffer, final Thro
275342 } else {
276343 pendingException = t ;
277344 }
278- if (pendingReader != null ) {
279- localPendingReader = pendingReader ;
280- pendingReader = null ;
281- }
345+ localPendingReader = pendingReader ;
282346 }
283347
284348 if (localPendingReader != null ) {
285- readAsync (localPendingReader .numBytes , localPendingReader .handler );
349+ //timeouts may be scheduled only by the public read methods
350+ readAsync (localPendingReader .numBytes , localPendingReader .handler , NO_SCHEDULE_TIME );
286351 }
287352 }
288353
@@ -358,10 +423,14 @@ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable t)
358423 private static final class PendingReader {
359424 private final int numBytes ;
360425 private final AsyncCompletionHandler <ByteBuf > handler ;
426+ @ Nullable
427+ private final ScheduledFuture <?> timeout ;
361428
362- private PendingReader (final int numBytes , final AsyncCompletionHandler <ByteBuf > handler ) {
429+ private PendingReader (
430+ final int numBytes , final AsyncCompletionHandler <ByteBuf > handler , @ Nullable final ScheduledFuture <?> timeout ) {
363431 this .numBytes = numBytes ;
364432 this .handler = handler ;
433+ this .timeout = timeout ;
365434 }
366435 }
367436
@@ -445,47 +514,52 @@ public void operationComplete(final ChannelFuture future) {
445514 }
446515 }
447516
448- private void scheduleReadTimeout (final int additionalTimeout ) {
449- adjustTimeout (false , additionalTimeout );
517+ private static void cancel (@ Nullable final Future <?> f ) {
518+ if (f != null ) {
519+ f .cancel (false );
520+ }
450521 }
451522
452- private void disableReadTimeout () {
453- adjustTimeout (true , 0 );
523+ private static long combinedTimeout (final long timeout , final int additionalTimeout ) {
524+ if (timeout == NO_SCHEDULE_TIME ) {
525+ return NO_SCHEDULE_TIME ;
526+ } else {
527+ return Math .addExact (timeout , additionalTimeout );
528+ }
454529 }
455530
456- private void adjustTimeout (final boolean disable , final int additionalTimeout ) {
457- if (isClosed ) {
458- return ;
459- }
460- ChannelHandler timeoutHandler = channel .pipeline ().get (READ_HANDLER_NAME );
461- if (timeoutHandler != null ) {
462- final ReadTimeoutHandler readTimeoutHandler = (ReadTimeoutHandler ) timeoutHandler ;
463- final ChannelHandlerContext handlerContext = channel .pipeline ().context (timeoutHandler );
464- EventExecutor executor = handlerContext .executor ();
465-
466- if (disable ) {
467- if (executor .inEventLoop ()) {
468- readTimeoutHandler .removeTimeout (handlerContext );
469- } else {
470- executor .submit (new Runnable () {
471- @ Override
472- public void run () {
473- readTimeoutHandler .removeTimeout (handlerContext );
474- }
475- });
476- }
477- } else {
478- if (executor .inEventLoop ()) {
479- readTimeoutHandler .scheduleTimeout (handlerContext , additionalTimeout );
480- } else {
481- executor .submit (new Runnable () {
482- @ Override
483- public void run () {
484- readTimeoutHandler .scheduleTimeout (handlerContext , additionalTimeout );
485- }
486- });
487- }
531+ private static ScheduledFuture <?> scheduleReadTimeout (@ Nullable final ReadTimeoutTask readTimeoutTask , final long timeoutMillis ) {
532+ if (timeoutMillis == NO_SCHEDULE_TIME ) {
533+ return null ;
534+ } else {
535+ //assert readTimeoutTask != null : "readTimeoutTask must be initialized if read timeouts are enabled";
536+ return readTimeoutTask .schedule (timeoutMillis );
537+ }
538+ }
539+
540+ @ ThreadSafe
541+ private static final class ReadTimeoutTask implements Runnable {
542+ private final ChannelHandlerContext ctx ;
543+
544+ private ReadTimeoutTask (final ChannelHandlerContext timeoutChannelHandlerContext ) {
545+ ctx = timeoutChannelHandlerContext ;
546+ }
547+
548+ @ Override
549+ public void run () {
550+ try {
551+ if (ctx .channel ().isOpen ()) {
552+ ctx .fireExceptionCaught (ReadTimeoutException .INSTANCE );
553+ ctx .close ();
488554 }
555+ } catch (final Throwable t ) {
556+ ctx .fireExceptionCaught (t );
489557 }
558+ }
559+
560+ private ScheduledFuture <?> schedule (final long timeoutMillis ) {
561+ //assert timeoutMillis > 0 : timeoutMillis;
562+ return ctx .executor ().schedule (this , timeoutMillis , MILLISECONDS );
563+ }
490564 }
491565}
0 commit comments