Skip to content

Commit df0a358

Browse files
committed
Add lightweight ping syntax support
1 parent 8ec8b97 commit df0a358

File tree

6 files changed

+135
-21
lines changed

6 files changed

+135
-21
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ This driver provides the following features:
2929
- [x] Secure connection with verification (SSL/TLS), auto-select TLS version for community and enterprise editions.
3030
- [x] SSL tunnel for proxy protocol of MySQL.
3131
- [x] Transactions with savepoint.
32-
- [x] Native ping command that can be verifying when argument is `ValidationDepth.REMOTE`
32+
- [x] Native ping can be sent via `Connection.validate(ValidationDepth.REMOTE)` and the lightweight ping syntax `/* ping */ ...`.
3333
- [x] Extensible, e.g. extend built-in `Codec`(s).
3434
- [x] MariaDB `RETURNING` clause.
3535

src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import io.r2dbc.spi.ValidationDepth;
3939
import org.jetbrains.annotations.Nullable;
4040
import org.reactivestreams.Publisher;
41+
import reactor.core.publisher.Flux;
4142
import reactor.core.publisher.Mono;
4243
import reactor.core.publisher.SynchronousSink;
4344

@@ -61,6 +62,8 @@ public final class MySqlConnection implements Connection, Lifecycle, ConnectionS
6162

6263
private static final int DEFAULT_LOCK_WAIT_TIMEOUT = 50;
6364

65+
private static final String PING_MARKER = "/* ping */";
66+
6467
private static final String ZONE_PREFIX_POSIX = "posix/";
6568

6669
private static final String ZONE_PREFIX_RIGHT = "right/";
@@ -79,15 +82,28 @@ public final class MySqlConnection implements Connection, Lifecycle, ConnectionS
7982

8083
private static final ServerVersion MARIA_10_1_1 = ServerVersion.create(10, 1, 1, true);
8184

82-
private static final BiConsumer<ServerMessage, SynchronousSink<Boolean>> PING = (message, sink) -> {
85+
private static final Function<ServerMessage, Boolean> VALIDATE = message -> {
86+
if (message instanceof CompleteMessage && ((CompleteMessage) message).isDone()) {
87+
return true;
88+
}
89+
8390
if (message instanceof ErrorMessage) {
8491
ErrorMessage msg = (ErrorMessage) message;
8592
logger.debug("Remote validate failed: [{}] [{}] {}", msg.getCode(), msg.getSqlState(),
8693
msg.getMessage());
87-
sink.next(false);
94+
} else {
95+
ReferenceCountUtil.safeRelease(message);
96+
}
97+
98+
return false;
99+
};
100+
101+
private static final BiConsumer<ServerMessage, SynchronousSink<ServerMessage>> PING = (message, sink) -> {
102+
if (message instanceof ErrorMessage) {
103+
sink.next(message);
88104
sink.complete();
89105
} else if (message instanceof CompleteMessage && ((CompleteMessage) message).isDone()) {
90-
sink.next(true);
106+
sink.next(message);
91107
sink.complete();
92108
} else {
93109
ReferenceCountUtil.safeRelease(message);
@@ -190,9 +206,7 @@ public Mono<Void> beginTransaction() {
190206

191207
@Override
192208
public Mono<Void> beginTransaction(TransactionDefinition definition) {
193-
return Mono.defer(() -> {
194-
return QueryFlow.beginTransaction(client, this, batchSupported, definition);
195-
});
209+
return Mono.defer(() -> QueryFlow.beginTransaction(client, this, batchSupported, definition));
196210
}
197211

198212
@Override
@@ -209,9 +223,7 @@ public Mono<Void> close() {
209223

210224
@Override
211225
public Mono<Void> commitTransaction() {
212-
return Mono.defer(() -> {
213-
return QueryFlow.doneTransaction(client, this, true, batchSupported);
214-
});
226+
return Mono.defer(() -> QueryFlow.doneTransaction(client, this, true, batchSupported));
215227
}
216228

217229
@Override
@@ -232,6 +244,10 @@ public Mono<Void> createSavepoint(String name) {
232244
public MySqlStatement createStatement(String sql) {
233245
requireNonNull(sql, "sql must not be null");
234246

247+
if (sql.startsWith(PING_MARKER)) {
248+
return new PingStatement(this, codecs, context);
249+
}
250+
235251
Query query = queryCache.get(sql);
236252

237253
if (query.isSimple()) {
@@ -339,8 +355,9 @@ public Mono<Boolean> validate(ValidationDepth depth) {
339355
return Mono.just(false);
340356
}
341357

342-
return client.exchange(PingMessage.INSTANCE, PING)
358+
return doPingInternal()
343359
.last()
360+
.map(VALIDATE)
344361
.onErrorResume(e -> {
345362
// `last` maybe emit a NoSuchElementException, exchange maybe emit exception by Netty.
346363
// But should NEVER emit any exception, so logging exception and emit false.
@@ -439,6 +456,10 @@ public Mono<Void> setStatementTimeout(Duration timeout) {
439456
);
440457
}
441458

459+
Flux<ServerMessage> doPingInternal() {
460+
return client.exchange(PingMessage.INSTANCE, PING);
461+
}
462+
442463
boolean isSessionAutoCommit() {
443464
return (context.getServerStatuses() & ServerStatuses.AUTO_COMMIT) != 0;
444465
}

src/main/java/io/asyncer/r2dbc/mysql/MySqlStatement.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
import io.r2dbc.spi.Statement;
2020
import reactor.core.publisher.Flux;
2121

22+
import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.require;
23+
import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull;
24+
2225
/**
2326
* A strongly typed implementation of {@link Statement} for the MySQL database.
2427
*/
@@ -58,17 +61,23 @@ public interface MySqlStatement extends Statement {
5861
* {@inheritDoc}
5962
*/
6063
@Override
61-
MySqlStatement returnGeneratedValues(String... columns);
64+
Flux<MySqlResult> execute();
6265

6366
/**
6467
* {@inheritDoc}
6568
*/
6669
@Override
67-
Flux<MySqlResult> execute();
70+
default MySqlStatement returnGeneratedValues(String... columns) {
71+
requireNonNull(columns, "columns must not be null");
72+
return this;
73+
}
6874

6975
/**
7076
* {@inheritDoc}
7177
*/
7278
@Override
73-
MySqlStatement fetchSize(int rows);
79+
default MySqlStatement fetchSize(int rows) {
80+
require(rows >= 0, "Fetch size must be greater or equal to zero");
81+
return this;
82+
}
7483
}

src/main/java/io/asyncer/r2dbc/mysql/MySqlStatementSupport.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.asyncer.r2dbc.mysql.internal.util.InternalArrays;
2020
import org.jetbrains.annotations.Nullable;
2121

22-
import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.require;
2322
import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonEmpty;
2423
import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull;
2524

@@ -66,12 +65,6 @@ public final MySqlStatement returnGeneratedValues(String... columns) {
6665
return this;
6766
}
6867

69-
@Override
70-
public MySqlStatement fetchSize(int rows) {
71-
require(rows >= 0, "Fetch size must be greater or equal to zero");
72-
return this;
73-
}
74-
7568
@Nullable
7669
final String syntheticKeyName() {
7770
String[] columns = this.generatedColumns;
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2024 asyncer.io projects
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.asyncer.r2dbc.mysql;
18+
19+
import io.asyncer.r2dbc.mysql.codec.Codecs;
20+
import reactor.core.publisher.Flux;
21+
22+
/**
23+
* An implementation of {@link MySqlStatement} considers the lightweight ping syntax.
24+
*/
25+
final class PingStatement implements MySqlStatement {
26+
27+
private final MySqlConnection connection;
28+
29+
private final Codecs codecs;
30+
31+
private final ConnectionContext context;
32+
33+
PingStatement(MySqlConnection connection, Codecs codecs, ConnectionContext context) {
34+
this.connection = connection;
35+
this.codecs = codecs;
36+
this.context = context;
37+
}
38+
39+
@Override
40+
public MySqlStatement add() {
41+
return this;
42+
}
43+
44+
@Override
45+
public MySqlStatement bind(int index, Object value) {
46+
throw new UnsupportedOperationException("Binding parameters is not supported for ping");
47+
}
48+
49+
@Override
50+
public MySqlStatement bind(String name, Object value) {
51+
throw new UnsupportedOperationException("Binding parameters is not supported for ping");
52+
}
53+
54+
@Override
55+
public MySqlStatement bindNull(int index, Class<?> type) {
56+
throw new UnsupportedOperationException("Binding parameters is not supported for ping");
57+
}
58+
59+
@Override
60+
public MySqlStatement bindNull(String name, Class<?> type) {
61+
throw new UnsupportedOperationException("Binding parameters is not supported for ping");
62+
}
63+
64+
@Override
65+
public Flux<MySqlResult> execute() {
66+
return Flux.just(MySqlResult.toResult(false, codecs, context, null,
67+
connection.doPingInternal()));
68+
}
69+
}

src/test/java/io/asyncer/r2dbc/mysql/QueryIntegrationTestSupport.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.jetbrains.annotations.Nullable;
2323
import org.junit.jupiter.api.Test;
2424
import org.junit.jupiter.api.condition.DisabledIf;
25+
import org.junit.jupiter.params.ParameterizedTest;
26+
import org.junit.jupiter.params.provider.ValueSource;
2527
import org.testcontainers.shaded.com.fasterxml.jackson.databind.JsonNode;
2628
import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper;
2729
import reactor.core.publisher.Flux;
@@ -92,6 +94,26 @@ <T> void testType(Type type, boolean valueSelect, String defined, T... values) {
9294
.concatMap(value -> testOne(connection, type, valueSelect, value.orElse(null)))));
9395
}
9496

97+
@Test
98+
void lightweightPing() {
99+
complete(connection -> Flux.from(connection.createStatement("/* ping */ SELECT 1").execute())
100+
.flatMap(result -> result.map(r -> 1))
101+
.reduce(0, Integer::sum)
102+
.doOnNext(number -> assertThat(number).isEqualTo(0)));
103+
}
104+
105+
@ParameterizedTest
106+
@ValueSource(strings = {
107+
"/* PING */ SELECT 1", "SELECT 1 /* ping*/", " /* ping */ SELECT 1",
108+
"/*ping*/ SELECT 1", "/*to ping or not to ping*/ SELECT 1"
109+
})
110+
void badLightweightPing(String value) {
111+
complete(connection -> Flux.from(connection.createStatement(value).execute())
112+
.flatMap(result -> result.map(r -> r.get(0, Integer.class)))
113+
.reduce(0, Integer::sum)
114+
.doOnNext(number -> assertThat(number).isEqualTo(1)));
115+
}
116+
95117
@Test
96118
void tinyintSigned() {
97119
testType(Byte.class, true, "TINYINT", (byte) 1, (byte) -1, null, Byte.MIN_VALUE, Byte.MAX_VALUE);

0 commit comments

Comments
 (0)