4444import reactor .util .context .Context ;
4545import reactor .util .context .ContextView ;
4646
47- import java .util .concurrent .atomic .AtomicBoolean ;
47+ import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
4848import java .util .function .BiConsumer ;
4949import java .util .function .Function ;
5050
@@ -62,6 +62,17 @@ final class ReactorNettyClient implements Client {
6262
6363 private static final boolean INFO_ENABLED = logger .isInfoEnabled ();
6464
65+ private static final int ST_CONNECTED = 0 ;
66+
67+ private static final int ST_CLOSING = 1 ;
68+
69+ private static final int ST_CLOSED = 2 ;
70+
71+ private static final AtomicIntegerFieldUpdater <ReactorNettyClient > STATE_UPDATER =
72+ AtomicIntegerFieldUpdater .newUpdater (ReactorNettyClient .class , "state" );
73+
74+ private volatile int state = ST_CONNECTED ;
75+
6576 private final Connection connection ;
6677
6778 private final ConnectionContext context ;
@@ -73,8 +84,6 @@ final class ReactorNettyClient implements Client {
7384
7485 private final RequestQueue requestQueue = new RequestQueue ();
7586
76- private final AtomicBoolean closing = new AtomicBoolean ();
77-
7887 ReactorNettyClient (Connection connection , MySqlSslConfiguration ssl , ConnectionContext context ) {
7988 requireNonNull (connection , "connection must not be null" );
8089 requireNonNull (context , "context must not be null" );
@@ -88,7 +97,7 @@ final class ReactorNettyClient implements Client {
8897 // Note: encoder/decoder should before reactor bridge.
8998 connection .addHandlerLast (EnvelopeSlicer .NAME , new EnvelopeSlicer ())
9099 .addHandlerLast (MessageDuplexCodec .NAME ,
91- new MessageDuplexCodec (context , this . closing , this . requestQueue ));
100+ new MessageDuplexCodec (context ));
92101
93102 if (ssl .getSslMode ().startSsl ()) {
94103 connection .addHandlerFirst (SslBridgeHandler .NAME , new SslBridgeHandler (context , ssl ));
@@ -191,24 +200,36 @@ public <T> Flux<T> exchange(FluxExchangeable<T> exchangeable) {
191200
192201 @ Override
193202 public Mono <Void > close () {
194- return Mono .<Mono <Void >>create (sink -> {
195- if (!closing .compareAndSet (false , true )) {
196- // client is closing or closed
197- sink .success ();
198- return ;
199- }
200-
201- requestQueue .submit (RequestTask .wrap (sink , Mono .fromRunnable (() -> {
202- Sinks .EmitResult result = requests .tryEmitNext (ExitMessage .INSTANCE );
203+ return Mono
204+ .<Mono <Void >>create (sink -> {
205+ if (state == ST_CLOSED ) {
206+ logger .debug ("Close request ignored (connection already closed)" );
207+ sink .success ();
208+ return ;
209+ }
203210
204- if (result != Sinks .EmitResult .OK ) {
205- logger .error ("Exit message sending failed due to {}, force closing" , result );
206- }
207- })));
208- }).flatMap (Function .identity ()).onErrorResume (e -> {
209- logger .error ("Exit message sending failed, force closing" , e );
210- return Mono .empty ();
211- }).then (forceClose ());
211+ logger .debug ("Close request accepted" );
212+
213+ requestQueue .submit (RequestTask .wrap (sink , Mono .fromRunnable (() -> {
214+ Sinks .EmitResult result = requests .tryEmitNext (ExitMessage .INSTANCE );
215+
216+ if (result != Sinks .EmitResult .OK ) {
217+ logger .error ("Exit message sending failed due to {}, force closing" , result );
218+ } else {
219+ if (STATE_UPDATER .compareAndSet (this , ST_CONNECTED , ST_CLOSING )) {
220+ logger .debug ("Exit message sent" );
221+ } else {
222+ logger .debug ("Exit message sent (duplicated / connection already closed)" );
223+ }
224+ }
225+ })));
226+ })
227+ .flatMap (Function .identity ())
228+ .onErrorResume (e -> {
229+ logger .error ("Exit message sending failed, force closing" , e );
230+ return Mono .empty ();
231+ })
232+ .then (forceClose ());
212233 }
213234
214235 @ Override
@@ -223,7 +244,7 @@ public ByteBufAllocator getByteBufAllocator() {
223244
224245 @ Override
225246 public boolean isConnected () {
226- return ! closing . get () && connection .channel ().isOpen ();
247+ return state < ST_CLOSED && connection .channel ().isOpen ();
227248 }
228249
229250 @ Override
@@ -239,7 +260,7 @@ public void loginSuccess() {
239260 @ Override
240261 public String toString () {
241262 return String .format ("ReactorNettyClient(%s){connectionId=%d}" ,
242- this . closing . get () ? "closing or closed " : "activating " , context .getConnectionId ());
263+ isConnected () ? "activating " : "clsoing or closed " , context .getConnectionId ());
243264 }
244265
245266 private void emitNextRequest (ClientMessage request ) {
@@ -275,10 +296,19 @@ private void drainError(R2dbcException e) {
275296 }
276297
277298 private void handleClose () {
278- if (this .closing .compareAndSet (false , true )) {
279- logger .warn ("Connection has been closed by peer" );
299+ final int oldState = state ;
300+ if (oldState == ST_CLOSED ) {
301+ logger .debug ("Connection already closed" );
302+ return ;
303+ }
304+
305+ STATE_UPDATER .set (this , ST_CLOSED );
306+
307+ if (oldState != ST_CLOSING ) {
308+ logger .debug ("Connection has been closed by peer" );
280309 drainError (ClientExceptions .unexpectedClosed ());
281310 } else {
311+ logger .debug ("Connection has been closed" );
282312 drainError (ClientExceptions .expectedClosed ());
283313 }
284314 }
0 commit comments