2424import com .mongodb .MongoSocketOpenException ;
2525import com .mongodb .MongoSocketReadTimeoutException ;
2626import com .mongodb .ServerAddress ;
27+ import com .mongodb .annotations .ThreadSafe ;
2728import com .mongodb .connection .AsyncCompletionHandler ;
2829import com .mongodb .connection .SocketSettings ;
2930import com .mongodb .connection .SslSettings ;
3031import com .mongodb .connection .Stream ;
32+ import com .mongodb .lang .Nullable ;
3133import io .netty .bootstrap .Bootstrap ;
3234import io .netty .buffer .ByteBufAllocator ;
3335import io .netty .buffer .CompositeByteBuf ;
3436import io .netty .buffer .PooledByteBufAllocator ;
3537import io .netty .channel .Channel ;
3638import io .netty .channel .ChannelFuture ;
3739import io .netty .channel .ChannelFutureListener ;
38- import io .netty .channel .ChannelHandler ;
3940import io .netty .channel .ChannelHandlerContext ;
41+ import io .netty .channel .ChannelInboundHandlerAdapter ;
4042import io .netty .channel .ChannelInitializer ;
4143import io .netty .channel .ChannelOption ;
44+ import io .netty .channel .ChannelPipeline ;
4245import io .netty .channel .EventLoopGroup ;
4346import io .netty .channel .SimpleChannelInboundHandler ;
4447import io .netty .channel .socket .SocketChannel ;
4548import io .netty .handler .ssl .SslHandler ;
4649import io .netty .handler .timeout .ReadTimeoutException ;
47- import io .netty .util .concurrent .EventExecutor ;
4850import org .bson .ByteBuf ;
4951
5052import javax .net .ssl .SSLContext ;
5860import java .util .List ;
5961import java .util .Queue ;
6062import java .util .concurrent .CountDownLatch ;
63+ import java .util .concurrent .Future ;
64+ import java .util .concurrent .ScheduledFuture ;
6165
6266import static com .mongodb .internal .connection .SslHelper .enableHostNameVerification ;
6367import static com .mongodb .internal .connection .SslHelper .enableSni ;
6468import static java .util .concurrent .TimeUnit .MILLISECONDS ;
6569
6670/**
6771 * A Stream implementation based on Netty 4.0.
72+ * Just like it is for the {@link java.nio.channels.AsynchronousSocketChannel},
73+ * concurrent pending<sup>1</sup> readers
74+ * (whether {@linkplain #read(int, int) synchronous} or {@linkplain #readAsync(int, AsyncCompletionHandler) asynchronous})
75+ * are not supported by {@link NettyStream}.
76+ * However, this class does not have a fail-fast mechanism checking for such situations.
77+ * <hr>
78+ * <sup>1</sup>We cannot simply say that read methods are not allowed be run concurrently because strictly speaking they are allowed,
79+ * as explained below.
80+ * <pre>{@code
81+ * NettyStream stream = ...;
82+ * stream.readAsync(1, new AsyncCompletionHandler<ByteBuf>() {//inv1
83+ * @Override
84+ * public void completed(ByteBuf o) {
85+ * stream.readAsync(//inv2
86+ * 1, ...);//ret2
87+ * }
88+ *
89+ * @Override
90+ * public void failed(Throwable t) {
91+ * }
92+ * });//ret1
93+ * }</pre>
94+ * Arrows on the diagram below represent happens-before relations.
95+ * <pre>{@code
96+ * int1 -> inv2 -> ret2
97+ * \--------> ret1
98+ * }</pre>
99+ * As shown on the diagram, the method {@link #readAsync(int, AsyncCompletionHandler)} runs concurrently with
100+ * itself in the example above. However, there are no concurrent pending readers because the second operation
101+ * is invoked after the first operation has completed reading despite the method has not returned yet.
68102 */
69103final class NettyStream implements Stream {
70- private static final String READ_HANDLER_NAME = "ReadTimeoutHandler" ;
104+ private static final byte NO_SCHEDULE_TIME = 0 ;
71105 private final ServerAddress address ;
72106 private final SocketSettings settings ;
73107 private final SslSettings sslSettings ;
@@ -79,8 +113,19 @@ final class NettyStream implements Stream {
79113 private volatile Channel channel ;
80114
81115 private final LinkedList <io .netty .buffer .ByteBuf > pendingInboundBuffers = new LinkedList <io .netty .buffer .ByteBuf >();
82- private volatile PendingReader pendingReader ;
83- private volatile Throwable pendingException ;
116+ /* The fields pendingReader, pendingException are always written/read inside synchronized blocks
117+ * that use the same NettyStream object, so they can be plain.*/
118+ private PendingReader pendingReader ;
119+ private Throwable pendingException ;
120+ /* The fields readTimeoutTask, readTimeoutMillis are each written only in the ChannelInitializer.initChannel method
121+ * (in addition to the write of the default value and the write by variable initializers),
122+ * and read only when NettyStream users read data, or Netty event loop handles incoming data.
123+ * Since actions done by the ChannelInitializer.initChannel method
124+ * are ordered (in the happens-before order) before user read actions and before event loop actions that handle incoming data,
125+ * these fields can be plain.*/
126+ @ Nullable
127+ private ReadTimeoutTask readTimeoutTask ;
128+ private long readTimeoutMillis = NO_SCHEDULE_TIME ;
84129
85130 NettyStream (final ServerAddress address , final SocketSettings settings , final SslSettings sslSettings , final EventLoopGroup workerGroup ,
86131 final Class <? extends SocketChannel > socketChannelClass , final ByteBufAllocator allocator ) {
@@ -135,6 +180,7 @@ private void initializeChannel(final AsyncCompletionHandler<Void> handler, final
135180 bootstrap .handler (new ChannelInitializer <SocketChannel >() {
136181 @ Override
137182 public void initChannel (final SocketChannel ch ) {
183+ ChannelPipeline pipeline = ch .pipeline ();
138184 if (sslSettings .isEnabled ()) {
139185 SSLEngine engine = getSslContext ().createSSLEngine (address .getHost (), address .getPort ());
140186 engine .setUseClientMode (true );
@@ -144,13 +190,20 @@ public void initChannel(final SocketChannel ch) {
144190 enableHostNameVerification (sslParameters );
145191 }
146192 engine .setSSLParameters (sslParameters );
147- ch . pipeline () .addFirst ("ssl" , new SslHandler (engine , false ));
193+ pipeline .addFirst ("ssl" , new SslHandler (engine , false ));
148194 }
195+
149196 int readTimeout = settings .getReadTimeout (MILLISECONDS );
150- if (readTimeout > 0 ) {
151- ch .pipeline ().addLast (READ_HANDLER_NAME , new ReadTimeoutHandler (readTimeout ));
197+ if (readTimeout > NO_SCHEDULE_TIME ) {
198+ readTimeoutMillis = readTimeout ;
199+ /* We need at least one handler before (in the inbound evaluation order) the InboundBufferHandler,
200+ * so that we can fire exception events (they are inbound events) using its context and the InboundBufferHandler
201+ * receives them. SslHandler is not always present, so adding a NOOP handler.*/
202+ pipeline .addLast (new ChannelInboundHandlerAdapter ());
203+ readTimeoutTask = new ReadTimeoutTask (pipeline .lastContext ());
152204 }
153- ch .pipeline ().addLast (new InboundBufferHandler ());
205+
206+ pipeline .addLast (new InboundBufferHandler ());
154207 }
155208 });
156209 final ChannelFuture channelFuture = bootstrap .connect (nextAddress );
@@ -193,14 +246,27 @@ public void operationComplete(final ChannelFuture future) throws Exception {
193246
194247 @ Override
195248 public void readAsync (final int numBytes , final AsyncCompletionHandler <ByteBuf > handler ) {
196- scheduleReadTimeout ();
249+ readAsync (numBytes , handler , readTimeoutMillis );
250+ }
251+
252+ /**
253+ * @param numBytes Must be equal to {@link #pendingReader}{@code .numBytes} when called by a Netty channel handler.
254+ * @param handler Must be equal to {@link #pendingReader}{@code .handler} when called by a Netty channel handler.
255+ * @param readTimeoutMillis Must be equal to {@link #NO_SCHEDULE_TIME} when called by a Netty channel handler.
256+ * Timeouts may be scheduled only by the public read methods. Taking into account that concurrent pending
257+ * readers are not allowed, there must not be a situation when threads attempt to schedule a timeout
258+ * before the previous one is either cancelled or completed.
259+ */
260+ private void readAsync (final int numBytes , final AsyncCompletionHandler <ByteBuf > handler , final long readTimeoutMillis ) {
197261 ByteBuf buffer = null ;
198262 Throwable exceptionResult = null ;
199263 synchronized (this ) {
200264 exceptionResult = pendingException ;
201265 if (exceptionResult == null ) {
202266 if (!hasBytesAvailable (numBytes )) {
203- pendingReader = new PendingReader (numBytes , handler );
267+ if (pendingReader == null ) {//called by a public read method
268+ pendingReader = new PendingReader (numBytes , handler , scheduleReadTimeout (readTimeoutTask , readTimeoutMillis ));
269+ }
204270 } else {
205271 CompositeByteBuf composite = allocator .compositeBuffer (pendingInboundBuffers .size ());
206272 int bytesNeeded = numBytes ;
@@ -223,13 +289,16 @@ public void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf>
223289 buffer = new NettyByteBuf (composite ).flip ();
224290 }
225291 }
292+ if (!(exceptionResult == null && buffer == null )//the read operation has completed
293+ && pendingReader != null ) {//we need to clear the pending reader
294+ cancel (pendingReader .timeout );
295+ this .pendingReader = null ;
296+ }
226297 }
227298 if (exceptionResult != null ) {
228- disableReadTimeout ();
229299 handler .failed (exceptionResult );
230300 }
231301 if (buffer != null ) {
232- disableReadTimeout ();
233302 handler .completed (buffer );
234303 }
235304 }
@@ -253,14 +322,12 @@ private void handleReadResponse(final io.netty.buffer.ByteBuf buffer, final Thro
253322 } else {
254323 pendingException = t ;
255324 }
256- if (pendingReader != null ) {
257- localPendingReader = pendingReader ;
258- pendingReader = null ;
259- }
325+ localPendingReader = pendingReader ;
260326 }
261327
262328 if (localPendingReader != null ) {
263- readAsync (localPendingReader .numBytes , localPendingReader .handler );
329+ //timeouts may be scheduled only by the public read methods
330+ readAsync (localPendingReader .numBytes , localPendingReader .handler , NO_SCHEDULE_TIME );
264331 }
265332 }
266333
@@ -336,10 +403,14 @@ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable t)
336403 private static final class PendingReader {
337404 private final int numBytes ;
338405 private final AsyncCompletionHandler <ByteBuf > handler ;
406+ @ Nullable
407+ private final ScheduledFuture <?> timeout ;
339408
340- private PendingReader (final int numBytes , final AsyncCompletionHandler <ByteBuf > handler ) {
409+ private PendingReader (
410+ final int numBytes , final AsyncCompletionHandler <ByteBuf > handler , @ Nullable final ScheduledFuture <?> timeout ) {
341411 this .numBytes = numBytes ;
342412 this .handler = handler ;
413+ this .timeout = timeout ;
343414 }
344415 }
345416
@@ -423,44 +494,52 @@ public void operationComplete(final ChannelFuture future) {
423494 }
424495 }
425496
426- private void scheduleReadTimeout () {
427- adjustTimeout (false );
497+ private static void cancel (@ Nullable final Future <?> f ) {
498+ if (f != null ) {
499+ f .cancel (false );
500+ }
428501 }
429502
430- private void disableReadTimeout () {
431- adjustTimeout (true );
503+ private static long combinedTimeout (final long timeout , final int additionalTimeout ) {
504+ if (timeout == NO_SCHEDULE_TIME ) {
505+ return NO_SCHEDULE_TIME ;
506+ } else {
507+ return Math .addExact (timeout , additionalTimeout );
508+ }
432509 }
433510
434- private void adjustTimeout (final boolean disable ) {
435- ChannelHandler timeoutHandler = channel .pipeline ().get (READ_HANDLER_NAME );
436- if (timeoutHandler != null ) {
437- final ReadTimeoutHandler readTimeoutHandler = (ReadTimeoutHandler ) timeoutHandler ;
438- final ChannelHandlerContext handlerContext = channel .pipeline ().context (timeoutHandler );
439- EventExecutor executor = handlerContext .executor ();
511+ private static ScheduledFuture <?> scheduleReadTimeout (@ Nullable final ReadTimeoutTask readTimeoutTask , final long timeoutMillis ) {
512+ if (timeoutMillis == NO_SCHEDULE_TIME ) {
513+ return null ;
514+ } else {
515+ //assert readTimeoutTask != null : "readTimeoutTask must be initialized if read timeouts are enabled";
516+ return readTimeoutTask .schedule (timeoutMillis );
517+ }
518+ }
440519
441- if (disable ) {
442- if (executor .inEventLoop ()) {
443- readTimeoutHandler .removeTimeout (handlerContext );
444- } else {
445- executor .submit (new Runnable () {
446- @ Override
447- public void run () {
448- readTimeoutHandler .removeTimeout (handlerContext );
449- }
450- });
451- }
452- } else {
453- if (executor .inEventLoop ()) {
454- readTimeoutHandler .scheduleTimeout (handlerContext );
455- } else {
456- executor .submit (new Runnable () {
457- @ Override
458- public void run () {
459- readTimeoutHandler .scheduleTimeout (handlerContext );
460- }
461- });
462- }
520+ @ ThreadSafe
521+ private static final class ReadTimeoutTask implements Runnable {
522+ private final ChannelHandlerContext ctx ;
523+
524+ private ReadTimeoutTask (final ChannelHandlerContext timeoutChannelHandlerContext ) {
525+ ctx = timeoutChannelHandlerContext ;
526+ }
527+
528+ @ Override
529+ public void run () {
530+ try {
531+ if (ctx .channel ().isOpen ()) {
532+ ctx .fireExceptionCaught (ReadTimeoutException .INSTANCE );
533+ ctx .close ();
463534 }
535+ } catch (final Throwable t ) {
536+ ctx .fireExceptionCaught (t );
464537 }
538+ }
539+
540+ private ScheduledFuture <?> schedule (final long timeoutMillis ) {
541+ //assert timeoutMillis > 0 : timeoutMillis;
542+ return ctx .executor ().schedule (this , timeoutMillis , MILLISECONDS );
543+ }
465544 }
466545}
0 commit comments