Skip to content

Commit d4a9fe0

Browse files
authored
Implement Lifecycle for MySqlConnection (#171)
Motivation: Implement `Lifecycle` for `MySqlConnection`. See also #64 Modification: The `MySqlConnection`. Result: The `MySqlConnection` implements `Lifecycle` and will rollback in `preRelease`.
1 parent 9073c9d commit d4a9fe0

File tree

2 files changed

+62
-1
lines changed

2 files changed

+62
-1
lines changed

src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.netty.util.internal.logging.InternalLoggerFactory;
3232
import io.r2dbc.spi.Connection;
3333
import io.r2dbc.spi.IsolationLevel;
34+
import io.r2dbc.spi.Lifecycle;
3435
import io.r2dbc.spi.TransactionDefinition;
3536
import io.r2dbc.spi.ValidationDepth;
3637
import org.jetbrains.annotations.Nullable;
@@ -52,7 +53,7 @@
5253
/**
5354
* An implementation of {@link Connection} for connecting to the MySQL database.
5455
*/
55-
public final class MySqlConnection implements Connection, ConnectionState {
56+
public final class MySqlConnection implements Connection, Lifecycle, ConnectionState {
5657

5758
private static final InternalLogger logger = InternalLoggerFactory.getInstance(MySqlConnection.class);
5859

@@ -278,6 +279,17 @@ public MySqlStatement createStatement(String sql) {
278279
return new PrepareParametrizedStatement(client, codecs, query, context, prepareCache);
279280
}
280281

282+
@Override
283+
public Mono<Void> postAllocate() {
284+
return Mono.empty();
285+
}
286+
287+
@Override
288+
public Mono<Void> preRelease() {
289+
// Rollback if the connection is in transaction.
290+
return rollbackTransaction();
291+
}
292+
281293
@Override
282294
public Mono<Void> releaseSavepoint(String name) {
283295
requireValidName(name, "Savepoint name must not be empty and not contain backticks");

src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.time.Duration;
2424
import java.util.Arrays;
25+
import java.util.Collections;
2526

2627
import static io.r2dbc.spi.IsolationLevel.READ_COMMITTED;
2728
import static io.r2dbc.spi.IsolationLevel.READ_UNCOMMITTED;
@@ -52,6 +53,54 @@ void isInTransaction() {
5253
.doOnSuccess(ignored -> assertThat(connection.isInTransaction()).isFalse()));
5354
}
5455

56+
@Test
57+
void autoRollbackPreRelease() {
58+
// Mock pool allocate/release.
59+
complete(conn -> conn.postAllocate()
60+
.thenMany(conn.createStatement("CREATE TEMPORARY TABLE test (id INT NOT NULL PRIMARY KEY)")
61+
.execute())
62+
.flatMap(MySqlResult::getRowsUpdated)
63+
.then(conn.beginTransaction())
64+
.thenMany(conn.createStatement("INSERT INTO test VALUES (1)")
65+
.execute())
66+
.flatMap(MySqlResult::getRowsUpdated)
67+
.single()
68+
.doOnNext(it -> assertThat(it).isEqualTo(1))
69+
.doOnSuccess(ignored -> assertThat(conn.isInTransaction()).isTrue())
70+
.then(conn.preRelease())
71+
.doOnSuccess(ignored -> assertThat(conn.isInTransaction()).isFalse())
72+
.then(conn.postAllocate())
73+
.thenMany(conn.createStatement("SELECT * FROM test")
74+
.execute())
75+
.flatMap(it -> it.map((row, metadata) -> row.get(0, Integer.class)))
76+
.count()
77+
.doOnNext(it -> assertThat(it).isZero()));
78+
}
79+
80+
@Test
81+
void shouldNotRollbackCommittedPreRelease() {
82+
// Mock pool allocate/release.
83+
complete(conn -> conn.postAllocate()
84+
.thenMany(conn.createStatement("CREATE TEMPORARY TABLE test (id INT NOT NULL PRIMARY KEY)")
85+
.execute())
86+
.flatMap(MySqlResult::getRowsUpdated)
87+
.then(conn.beginTransaction())
88+
.thenMany(conn.createStatement("INSERT INTO test VALUES (1)")
89+
.execute())
90+
.flatMap(MySqlResult::getRowsUpdated)
91+
.single()
92+
.doOnNext(it -> assertThat(it).isEqualTo(1))
93+
.then(conn.commitTransaction())
94+
.then(conn.preRelease())
95+
.doOnSuccess(ignored -> assertThat(conn.isInTransaction()).isFalse())
96+
.then(conn.postAllocate())
97+
.thenMany(conn.createStatement("SELECT * FROM test")
98+
.execute())
99+
.flatMap(it -> it.map((row, metadata) -> row.get(0, Integer.class)))
100+
.collectList()
101+
.doOnNext(it -> assertThat(it).isEqualTo(Collections.singletonList(1))));
102+
}
103+
55104
@Test
56105
void transactionDefinitionLockWaitTimeout() {
57106
complete(connection -> connection.beginTransaction(MySqlTransactionDefinition.builder()

0 commit comments

Comments
 (0)