Skip to content

Commit 878210d

Browse files
authored
Fix Connection prematurely closed (#144)
Motivation: The existing `ReactorNettyClient#close()` method might not handle requests asynchronously, potentially resulting in premature connection closure. Modification: Make sure that ReactorNettyClient#close() handles requests asynchronously. Result: Resolves #139
1 parent 4062ac8 commit 878210d

File tree

2 files changed

+56
-42
lines changed

2 files changed

+56
-42
lines changed

src/main/java/io/asyncer/r2dbc/mysql/client/MessageDuplexCodec.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@
4242
import io.netty.util.internal.logging.InternalLoggerFactory;
4343
import reactor.core.publisher.Flux;
4444

45-
import java.util.concurrent.atomic.AtomicBoolean;
46-
4745
import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull;
4846

4947
/**
@@ -59,16 +57,10 @@ final class MessageDuplexCodec extends ChannelDuplexHandler {
5957

6058
private final ConnectionContext context;
6159

62-
private final AtomicBoolean closing;
63-
64-
private final RequestQueue requestQueue;
65-
6660
private final ServerMessageDecoder decoder = new ServerMessageDecoder();
6761

68-
MessageDuplexCodec(ConnectionContext context, AtomicBoolean closing, RequestQueue requestQueue) {
62+
MessageDuplexCodec(ConnectionContext context) {
6963
this.context = requireNonNull(context, "context must not be null");
70-
this.closing = requireNonNull(closing, "closing must not be null");
71-
this.requestQueue = requireNonNull(requestQueue, "requestQueue must not be null");
7264
}
7365

7466
@Override
@@ -129,14 +121,6 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
129121
@Override
130122
public void channelInactive(ChannelHandlerContext ctx) {
131123
decoder.dispose();
132-
requestQueue.dispose();
133-
134-
// Server has closed the connection without us wanting to close it
135-
// Typically happens if we send data asynchronously (i.e. previous command didn't complete).
136-
if (closing.compareAndSet(false, true)) {
137-
logger.warn("Connection has been closed by peer");
138-
}
139-
140124
ctx.fireChannelInactive();
141125
}
142126

src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java

Lines changed: 55 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
import reactor.util.context.Context;
4545
import reactor.util.context.ContextView;
4646

47-
import java.util.concurrent.atomic.AtomicBoolean;
47+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
4848
import java.util.function.BiConsumer;
4949
import java.util.function.Function;
5050

@@ -62,6 +62,17 @@ final class ReactorNettyClient implements Client {
6262

6363
private static final boolean INFO_ENABLED = logger.isInfoEnabled();
6464

65+
private static final int ST_CONNECTED = 0;
66+
67+
private static final int ST_CLOSING = 1;
68+
69+
private static final int ST_CLOSED = 2;
70+
71+
private static final AtomicIntegerFieldUpdater<ReactorNettyClient> STATE_UPDATER =
72+
AtomicIntegerFieldUpdater.newUpdater(ReactorNettyClient.class, "state");
73+
74+
private volatile int state = ST_CONNECTED;
75+
6576
private final Connection connection;
6677

6778
private final ConnectionContext context;
@@ -73,8 +84,6 @@ final class ReactorNettyClient implements Client {
7384

7485
private final RequestQueue requestQueue = new RequestQueue();
7586

76-
private final AtomicBoolean closing = new AtomicBoolean();
77-
7887
ReactorNettyClient(Connection connection, MySqlSslConfiguration ssl, ConnectionContext context) {
7988
requireNonNull(connection, "connection must not be null");
8089
requireNonNull(context, "context must not be null");
@@ -88,7 +97,7 @@ final class ReactorNettyClient implements Client {
8897
// Note: encoder/decoder should before reactor bridge.
8998
connection.addHandlerLast(EnvelopeSlicer.NAME, new EnvelopeSlicer())
9099
.addHandlerLast(MessageDuplexCodec.NAME,
91-
new MessageDuplexCodec(context, this.closing, this.requestQueue));
100+
new MessageDuplexCodec(context));
92101

93102
if (ssl.getSslMode().startSsl()) {
94103
connection.addHandlerFirst(SslBridgeHandler.NAME, new SslBridgeHandler(context, ssl));
@@ -191,24 +200,36 @@ public <T> Flux<T> exchange(FluxExchangeable<T> exchangeable) {
191200

192201
@Override
193202
public Mono<Void> close() {
194-
return Mono.<Mono<Void>>create(sink -> {
195-
if (!closing.compareAndSet(false, true)) {
196-
// client is closing or closed
197-
sink.success();
198-
return;
199-
}
200-
201-
requestQueue.submit(RequestTask.wrap(sink, Mono.fromRunnable(() -> {
202-
Sinks.EmitResult result = requests.tryEmitNext(ExitMessage.INSTANCE);
203+
return Mono
204+
.<Mono<Void>>create(sink -> {
205+
if (state == ST_CLOSED) {
206+
logger.debug("Close request ignored (connection already closed)");
207+
sink.success();
208+
return;
209+
}
203210

204-
if (result != Sinks.EmitResult.OK) {
205-
logger.error("Exit message sending failed due to {}, force closing", result);
206-
}
207-
})));
208-
}).flatMap(Function.identity()).onErrorResume(e -> {
209-
logger.error("Exit message sending failed, force closing", e);
210-
return Mono.empty();
211-
}).then(forceClose());
211+
logger.debug("Close request accepted");
212+
213+
requestQueue.submit(RequestTask.wrap(sink, Mono.fromRunnable(() -> {
214+
Sinks.EmitResult result = requests.tryEmitNext(ExitMessage.INSTANCE);
215+
216+
if (result != Sinks.EmitResult.OK) {
217+
logger.error("Exit message sending failed due to {}, force closing", result);
218+
} else {
219+
if (STATE_UPDATER.compareAndSet(this, ST_CONNECTED, ST_CLOSING)) {
220+
logger.debug("Exit message sent");
221+
} else {
222+
logger.debug("Exit message sent (duplicated / connection already closed)");
223+
}
224+
}
225+
})));
226+
})
227+
.flatMap(Function.identity())
228+
.onErrorResume(e -> {
229+
logger.error("Exit message sending failed, force closing", e);
230+
return Mono.empty();
231+
})
232+
.then(forceClose());
212233
}
213234

214235
@Override
@@ -223,7 +244,7 @@ public ByteBufAllocator getByteBufAllocator() {
223244

224245
@Override
225246
public boolean isConnected() {
226-
return !closing.get() && connection.channel().isOpen();
247+
return state < ST_CLOSED && connection.channel().isOpen();
227248
}
228249

229250
@Override
@@ -239,7 +260,7 @@ public void loginSuccess() {
239260
@Override
240261
public String toString() {
241262
return String.format("ReactorNettyClient(%s){connectionId=%d}",
242-
this.closing.get() ? "closing or closed" : "activating", context.getConnectionId());
263+
isConnected() ? "activating" : "clsoing or closed", context.getConnectionId());
243264
}
244265

245266
private void emitNextRequest(ClientMessage request) {
@@ -275,10 +296,19 @@ private void drainError(R2dbcException e) {
275296
}
276297

277298
private void handleClose() {
278-
if (this.closing.compareAndSet(false, true)) {
279-
logger.warn("Connection has been closed by peer");
299+
final int oldState = state;
300+
if (oldState == ST_CLOSED) {
301+
logger.debug("Connection already closed");
302+
return;
303+
}
304+
305+
STATE_UPDATER.set(this, ST_CLOSED);
306+
307+
if (oldState != ST_CLOSING) {
308+
logger.debug("Connection has been closed by peer");
280309
drainError(ClientExceptions.unexpectedClosed());
281310
} else {
311+
logger.debug("Connection has been closed");
282312
drainError(ClientExceptions.expectedClosed());
283313
}
284314
}

0 commit comments

Comments
 (0)